You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 23:15:25 UTC

[09/51] [partial] Initial commit of master branch from github

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/schema/PDateColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PDateColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PDateColumn.java
new file mode 100644
index 0000000..8ed6c4a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PDateColumn.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+abstract public class PDateColumn extends PBaseColumn {
+    @Override
+    public PDataType getDataType() {
+        return PDataType.DATE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/schema/PDatum.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PDatum.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PDatum.java
new file mode 100644
index 0000000..e1946f5
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PDatum.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+
+public interface PDatum {
+    /**
+     * @return is this column nullable?
+     */
+    boolean isNullable();
+
+    /**
+     * @return data type of the column
+     */
+    PDataType getDataType();
+
+    /**
+     * @return maximum byte length of the column
+     */
+    Integer getByteSize();
+
+    /**
+     * @return the actual length of the column. For decimal, it would be its precision. For char or
+     * varchar, it would be the maximum length as specified during schema definition.
+     */
+    Integer getMaxLength();
+
+    /**
+     * @return scale of a decimal number.
+     */
+    Integer getScale();
+    
+    /**
+     * @return The modifier for this column or null if it doesn't have a modifier
+     */
+    ColumnModifier getColumnModifier();
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/schema/PIndexState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PIndexState.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PIndexState.java
new file mode 100644
index 0000000..330c5c9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PIndexState.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+
+public enum PIndexState {
+    BUILDING("b"),
+    USABLE("e"),
+    UNUSABLE("d"),
+    ACTIVE("a"),
+    INACTIVE("i"),
+    DISABLE("x"),
+    REBUILD("r");
+
+    private final String serializedValue;
+    private final byte[] serializedBytes;
+    private final byte[] nameBytesValue;
+
+    private PIndexState(String value) {
+        this.serializedValue = value;
+        this.serializedBytes = PDataType.VARCHAR.toBytes(value);
+        this.nameBytesValue = PDataType.VARCHAR.toBytes(this.toString());
+    }
+
+    public String getSerializedValue() {
+        return serializedValue;
+    }
+
+    public byte[] getSerializedBytes() {
+        return serializedBytes;
+    }
+
+    public byte[] toBytes() {
+        return nameBytesValue;
+    }
+
+    private static final PIndexState[] FROM_VALUE;
+    private static final int FROM_VALUE_OFFSET;
+    static {
+        int minChar = Integer.MAX_VALUE;
+        int maxChar = Integer.MIN_VALUE;
+        for (PIndexState state: PIndexState.values()) {
+            char c = state.getSerializedValue().charAt(0);
+            if (c < minChar) {
+                minChar = c;
+            }
+            if (c > maxChar) {
+                maxChar = c;
+            }
+        }
+        FROM_VALUE_OFFSET = minChar;
+        FROM_VALUE = new PIndexState[maxChar - minChar + 1];
+        for (PIndexState state: PIndexState.values()) {
+            FROM_VALUE[state.getSerializedValue().charAt(0) - minChar] = state;
+        }
+    }
+
+    public static PIndexState fromSerializedValue(String serializedValue) {
+        if (serializedValue.length() == 1) {
+            int i = serializedValue.charAt(0) - FROM_VALUE_OFFSET;
+            if (i >= 0 && i < FROM_VALUE.length && FROM_VALUE[i] != null) {
+                return FROM_VALUE[i];
+            }
+        }
+        throw new IllegalArgumentException("Unable to PIndexState enum for serialized value of '" + serializedValue + "'");
+    }
+
+    public static PIndexState fromSerializedValue(byte serializedByte) {
+        int i = serializedByte - FROM_VALUE_OFFSET;
+        if (i >= 0 && i < FROM_VALUE.length && FROM_VALUE[i] != null) {
+            return FROM_VALUE[i];
+        }
+        throw new IllegalArgumentException("Unable to PIndexState enum for serialized value of '" + (char)serializedByte + "'");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/schema/PIntegerColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PIntegerColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PIntegerColumn.java
new file mode 100644
index 0000000..5c00117
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PIntegerColumn.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+
+/**
+ * 
+ * Base class for PColumn implementors of type Integer.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public abstract class PIntegerColumn extends PBaseColumn {
+    @Override
+    public final PDataType getDataType() {
+        return PDataType.INTEGER;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/schema/PLongColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PLongColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PLongColumn.java
new file mode 100644
index 0000000..69fd7e9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PLongColumn.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+
+/**
+ * 
+ * Base class for PColumn implementors of type Long.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public abstract class PLongColumn extends PBaseColumn {
+    @Override
+    public final PDataType getDataType() {
+        return PDataType.LONG;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java
new file mode 100644
index 0000000..1af5c4a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import java.util.Map;
+
+import org.apache.phoenix.query.MetaDataMutated;
+
+
+public interface PMetaData extends MetaDataMutated {
+    public PTable getTable(String name) throws TableNotFoundException;
+    public Map<String, PTable> getTables();
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
new file mode 100644
index 0000000..cbe5d50
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
@@ -0,0 +1,200 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class PMetaDataImpl implements PMetaData {
+    public static final PMetaData EMPTY_META_DATA = new PMetaDataImpl(Collections.<String,PTable>emptyMap());
+    private final Map<String,PTable> metaData;
+    
+    public PMetaDataImpl(Map<String,PTable> tables) {
+        this.metaData = ImmutableMap.copyOf(tables);
+    }
+    
+    @Override
+    public PTable getTable(String name) throws TableNotFoundException {
+        PTable table = metaData.get(name);
+        if (table == null) {
+            throw new TableNotFoundException(name);
+        }
+        return table;
+    }
+
+    @Override
+    public Map<String,PTable> getTables() {
+        return metaData;
+    }
+
+
+    @Override
+    public PMetaData addTable(PTable table) throws SQLException {
+        Map<String,PTable> tables = Maps.newHashMap(metaData);
+        PTable oldTable = tables.put(table.getName().getString(), table);
+        if (table.getParentName() != null) { // Upsert new index table into parent data table list
+            String parentName = table.getParentName().getString();
+            PTable parentTable = tables.get(parentName);
+            // If parentTable isn't cached, that's ok we can skip this
+            if (parentTable != null) {
+                List<PTable> oldIndexes = parentTable.getIndexes();
+                List<PTable> newIndexes = Lists.newArrayListWithExpectedSize(oldIndexes.size() + 1);
+                newIndexes.addAll(oldIndexes);
+                if (oldTable != null) {
+                    newIndexes.remove(oldTable);
+                }
+                newIndexes.add(table);
+                tables.put(parentName, PTableImpl.makePTable(parentTable, table.getTimeStamp(), newIndexes));
+            }
+        }
+        for (PTable index : table.getIndexes()) {
+            tables.put(index.getName().getString(), index);
+        }
+        return new PMetaDataImpl(tables);
+    }
+
+    @Override
+    public PMetaData addColumn(String tableName, List<PColumn> columnsToAdd, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows) throws SQLException {
+        PTable table = getTable(tableName);
+        Map<String,PTable> tables = Maps.newHashMap(metaData);
+        List<PColumn> oldColumns = PTableImpl.getColumnsToClone(table);
+        List<PColumn> newColumns;
+        if (columnsToAdd.isEmpty()) {
+            newColumns = oldColumns;
+        } else {
+            newColumns = Lists.newArrayListWithExpectedSize(oldColumns.size() + columnsToAdd.size());
+            newColumns.addAll(oldColumns);
+            newColumns.addAll(columnsToAdd);
+        }
+        PTable newTable = PTableImpl.makePTable(table, tableTimeStamp, tableSeqNum, newColumns, isImmutableRows);
+        tables.put(tableName, newTable);
+        return new PMetaDataImpl(tables);
+    }
+
+    @Override
+    public PMetaData removeTable(String tableName) throws SQLException {
+        PTable table;
+        Map<String,PTable> tables = Maps.newHashMap(metaData);
+        if ((table=tables.remove(tableName)) == null) {
+            throw new TableNotFoundException(tableName);
+        } else {
+            for (PTable index : table.getIndexes()) {
+                if (tables.remove(index.getName().getString()) == null) {
+                    throw new TableNotFoundException(index.getName().getString());
+                }
+            }
+        }
+        return new PMetaDataImpl(tables);
+    }
+    
+    @Override
+    public PMetaData removeColumn(String tableName, String familyName, String columnName, long tableTimeStamp, long tableSeqNum) throws SQLException {
+        PTable table = getTable(tableName);
+        Map<String,PTable> tables = Maps.newHashMap(metaData);
+        PColumn column;
+        if (familyName == null) {
+            column = table.getPKColumn(columnName);
+        } else {
+            column = table.getColumnFamily(familyName).getColumn(columnName);
+        }
+        int positionOffset = 0;
+        int position = column.getPosition();
+        List<PColumn> oldColumns = table.getColumns();
+        if (table.getBucketNum() != null) {
+            position--;
+            positionOffset = 1;
+            oldColumns = oldColumns.subList(positionOffset, oldColumns.size());
+        }
+        List<PColumn> columns = Lists.newArrayListWithExpectedSize(oldColumns.size() - 1);
+        columns.addAll(oldColumns.subList(0, position));
+        // Update position of columns that follow removed column
+        for (int i = position+1; i < oldColumns.size(); i++) {
+            PColumn oldColumn = oldColumns.get(i);
+            PColumn newColumn = new PColumnImpl(oldColumn.getName(), oldColumn.getFamilyName(), oldColumn.getDataType(), oldColumn.getMaxLength(), oldColumn.getScale(), oldColumn.isNullable(), i-1+positionOffset, oldColumn.getColumnModifier(), oldColumn.getArraySize());
+            columns.add(newColumn);
+        }
+        
+        PTable newTable = PTableImpl.makePTable(table, tableTimeStamp, tableSeqNum, columns);
+        tables.put(tableName, newTable);
+        return new PMetaDataImpl(tables);
+    }
+
+    public static PMetaData pruneNewerTables(long scn, PMetaData metaData) {
+        if (!hasNewerMetaData(scn, metaData)) {
+            return metaData;
+        }
+        Map<String,PTable> newTables = Maps.newHashMap(metaData.getTables());
+        Iterator<Map.Entry<String, PTable>> tableIterator = newTables.entrySet().iterator();
+        boolean wasModified = false;
+        while (tableIterator.hasNext()) {
+            PTable table = tableIterator.next().getValue();
+            if (table.getTimeStamp() >= scn && table.getType() != PTableType.SYSTEM) {
+                tableIterator.remove();
+                wasModified = true;
+            }
+        }
+    
+        if (wasModified) {
+            return new PMetaDataImpl(newTables);
+        }
+        return metaData;
+    }
+
+    private static boolean hasNewerMetaData(long scn, PMetaData metaData) {
+        for (PTable table : metaData.getTables().values()) {
+            if (table.getTimeStamp() >= scn) {
+                return true;
+            }
+        }
+        return false;
+    }
+    
+    private static boolean hasMultiTenantMetaData(PMetaData metaData) {
+        for (PTable table : metaData.getTables().values()) {
+            if (table.isMultiTenant()) {
+                return true;
+            }
+        }
+        return false;
+    }
+    
+    public static PMetaData pruneMultiTenant(PMetaData metaData) {
+        if (!hasMultiTenantMetaData(metaData)) {
+            return metaData;
+        }
+        Map<String,PTable> newTables = Maps.newHashMap(metaData.getTables());
+        Iterator<Map.Entry<String, PTable>> tableIterator = newTables.entrySet().iterator();
+        while (tableIterator.hasNext()) {
+            PTable table = tableIterator.next().getValue();
+            if (table.isMultiTenant()) {
+                tableIterator.remove();
+            }
+        }
+    
+        return new PMetaDataImpl(newTables);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/schema/PName.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PName.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PName.java
new file mode 100644
index 0000000..97d8989
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PName.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.util.ByteUtil;
+
+
+/**
+ * 
+ * Interface to encapsulate both the client-side name
+ * together with the server-side name for a named object
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface PName {
+    public static PName EMPTY_NAME = new PName() {
+        @Override
+        public String getString() {
+            return "";
+        }
+
+        @Override
+        public byte[] getBytes() {
+            return ByteUtil.EMPTY_BYTE_ARRAY;
+        }
+        
+        @Override
+        public String toString() {
+            return getString();
+        }
+
+        @Override
+        public ImmutableBytesPtr getBytesPtr() {
+            return ByteUtil.EMPTY_BYTE_ARRAY_PTR;
+        }
+    };
+    public static PName EMPTY_COLUMN_NAME = new PName() {
+        @Override
+        public String getString() {
+            return QueryConstants.EMPTY_COLUMN_NAME;
+        }
+
+        @Override
+        public byte[] getBytes() {
+            return QueryConstants.EMPTY_COLUMN_BYTES;
+        }
+        
+        @Override
+        public String toString() {
+            return getString();
+        }
+
+        @Override
+        public ImmutableBytesPtr getBytesPtr() {
+            return QueryConstants.EMPTY_COLUMN_BYTES_PTR;
+        }
+    };
+    /**
+     * Get the client-side, normalized name as referenced
+     * in a SQL statement.
+     * @return the normalized string name
+     */
+    String getString();
+    
+    /**
+     * Get the server-side name as referenced in HBase-related
+     * APIs such as Scan, Filter, etc.
+     * @return the name as a byte array
+     */
+    byte[] getBytes();
+
+    /**
+     * @return a pointer to the underlying bytes
+     */
+    ImmutableBytesPtr getBytesPtr();
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/schema/PNameFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PNameFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PNameFactory.java
new file mode 100644
index 0000000..cbea54c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PNameFactory.java
@@ -0,0 +1,23 @@
+package org.apache.phoenix.schema;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.query.QueryConstants;
+
+public class PNameFactory {
+
+    private PNameFactory() {
+    }
+
+    public static PName newName(String name) {
+        return name == null || name.isEmpty() ? PName.EMPTY_NAME : 
+            name.equals(QueryConstants.EMPTY_COLUMN_NAME ) ?  PName.EMPTY_COLUMN_NAME : 
+                new PNameImpl(name);
+    }
+    
+    public static PName newName(byte[] bytes) {
+        return bytes == null || bytes.length == 0 ? PName.EMPTY_NAME : 
+            Bytes.compareTo(bytes, QueryConstants.EMPTY_COLUMN_BYTES) == 0 ? PName.EMPTY_COLUMN_NAME :
+                new PNameImpl(bytes);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/schema/PNameImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PNameImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PNameImpl.java
new file mode 100644
index 0000000..8feca89
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PNameImpl.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.http.annotation.Immutable;
+
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+
+@Immutable
+public class PNameImpl implements PName {
+    /**
+     */
+    private static class PNameImplData {
+        /**  */
+        public String stringName;
+        /**  */
+        public byte[] bytesName;
+        /**  */
+        public ImmutableBytesPtr ptr;
+
+        /**
+         *
+         */
+        public PNameImplData() {
+        }
+    }
+    private PNameImplData data = new PNameImplData();
+
+    PNameImpl(String name) {
+        this.data.stringName = name;
+        this.data.bytesName = Bytes.toBytes(name);
+    }
+
+    PNameImpl(byte[] name) {
+        this.data.stringName = Bytes.toString(name);
+        this.data.bytesName = name;
+    }
+
+    @Override
+    public String getString() {
+        return data.stringName;
+    }
+
+    @Override
+    public byte[] getBytes() {
+        return data.bytesName;
+    }
+
+    @Override
+    public ImmutableBytesPtr getBytesPtr() {
+        if (data.ptr == null) {
+            synchronized (data.bytesName) {
+                if (data.ptr == null) {
+                    this.data.ptr = new ImmutableBytesPtr(data.bytesName);
+                }
+            }
+        }
+        return this.data.ptr;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + data.stringName.hashCode();
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) return true;
+        if (obj == null) return false;
+        if (getClass() != obj.getClass()) return false;
+        PNameImpl other = (PNameImpl) obj;
+        // Compare normalized stringName for equality, since bytesName
+        // may differ since it remains case sensitive.
+        if (!data.stringName.equals(other.data.stringName)) return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return data.stringName;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/schema/PNormalizedName.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PNormalizedName.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PNormalizedName.java
new file mode 100644
index 0000000..4876de9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PNormalizedName.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import org.apache.http.annotation.Immutable;
+
+import org.apache.phoenix.util.SchemaUtil;
+
+
+@Immutable
+public class PNormalizedName extends PNameImpl {
+    
+    public PNormalizedName(String nonNormalizedName) {
+        super(SchemaUtil.normalizeIdentifier(nonNormalizedName));
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + getString().hashCode();
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) return true;
+        if (obj == null) return false;
+        if (getClass() != obj.getClass()) return false;
+        PNormalizedName other = (PNormalizedName)obj;
+        // Compare normalized stringName for equality, since bytesName
+        // may differ since it remains case sensitive.
+        if (!getString().equals(other.getString())) return false;
+        return true;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/schema/PRow.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PRow.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PRow.java
new file mode 100644
index 0000000..22decfc
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PRow.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.client.Mutation;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * 
+ * Provide a client API for updating rows. The updates are processed in
+ * the calling order. Calling setValue after calling delete will cause the
+ * delete to be canceled.  Conversely, calling delete after calling
+ * setValue will cause all prior setValue calls to be canceled.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface PRow {
+    Map<PColumn, byte[]> DELETE_MARKER = ImmutableMap.of();
+
+    /**
+     * Get the list of {@link org.apache.hadoop.hbase.client.Mutation} used to
+     * update an HTable after all mutations through calls to
+     * {@link #setValue(PColumn, Object)} or {@link #delete()}.
+     * @return the list of mutations representing all changes made to a row
+     * @throws ConstraintViolationException if row data violates schema
+     * constraint
+     */
+    public List<Mutation> toRowMutations();
+    
+    /**
+     * Set a column value in the row
+     * @param col the column for which the value is being set
+     * @param value the value
+     * @throws ConstraintViolationException if row data violates schema
+     * constraint
+     */
+    public void setValue(PColumn col, Object value);
+    
+    /**
+     * Set a column value in the row
+     * @param col the column for which the value is being set
+     * @param value the value
+     * @throws ConstraintViolationException if row data violates schema
+     * constraint
+     */
+    public void setValue(PColumn col, byte[] value);
+    
+    /**
+     * Delete the row. Note that a delete take precedence over any
+     * values that may have been set before or after the delete call.
+     */
+    public void delete();
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/schema/PStringColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PStringColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PStringColumn.java
new file mode 100644
index 0000000..606b541
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PStringColumn.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+/**
+ * 
+ * Abstract class for columns of type {@link org.apache.phoenix.schema.PDataType#VARCHAR}
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public abstract class PStringColumn extends PBaseColumn {
+
+    @Override
+    public PDataType getDataType() {
+        return PDataType.VARCHAR;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
new file mode 100644
index 0000000..10a568e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -0,0 +1,280 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+import org.apache.phoenix.client.KeyValueBuilder;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.schema.stat.PTableStats;
+
+
+/**
+ * Definition of a Phoenix table
+ *
+ * @author wmacklem,jtaylor
+ * @since 0.1
+ */
+public interface PTable extends Writable {
+    public static final long INITIAL_SEQ_NUM = 0;
+    public static final String IS_IMMUTABLE_ROWS_PROP_NAME = "IMMUTABLE_ROWS";
+    public static final boolean DEFAULT_DISABLE_WAL = false;
+    
+    public enum ViewType { 
+        MAPPED((byte)1),
+        READ_ONLY((byte)2),
+        UPDATABLE((byte)3);
+
+        private final byte[] byteValue;
+        private final byte serializedValue;
+        
+        ViewType(byte serializedValue) {
+            this.serializedValue = serializedValue;
+            this.byteValue = Bytes.toBytes(this.name());
+        }
+        
+        public byte[] getBytes() {
+            return byteValue;
+        }
+        
+        public boolean isReadOnly() {
+            return this != UPDATABLE;
+        }
+        
+        public byte getSerializedValue() {
+            return this.serializedValue;
+        }
+        
+        public static ViewType fromSerializedValue(byte serializedValue) {
+            if (serializedValue < 1 || serializedValue > ViewType.values().length) {
+                throw new IllegalArgumentException("Invalid ViewType " + serializedValue);
+            }
+            return ViewType.values()[serializedValue-1];
+        }
+        
+        public ViewType combine(ViewType otherType) {
+            if (otherType == null) {
+                return this;
+            }
+            if (this == UPDATABLE && otherType == UPDATABLE) {
+                return UPDATABLE;
+            }
+            return READ_ONLY;
+        }
+    }
+
+    public enum LinkType {
+        /**
+         * Link from a table to its index table
+         */
+        INDEX_TABLE((byte)1),
+        /**
+         * Link from a view to its physical table
+         */
+        PHYSICAL_TABLE((byte)2);
+
+        private final byte[] byteValue;
+        private final byte serializedValue;
+        
+        LinkType(byte serializedValue) {
+            this.serializedValue = serializedValue;
+            this.byteValue = Bytes.toBytes(this.name());
+        }
+        
+        public byte[] getBytes() {
+            return byteValue;
+        }
+        
+        public byte getSerializedValue() {
+            return this.serializedValue;
+        }
+        
+        public static LinkType fromSerializedValue(byte serializedValue) {
+            if (serializedValue < 1 || serializedValue > LinkType.values().length) {
+                return null;
+            }
+            return LinkType.values()[serializedValue-1];
+        }
+    }
+
+    long getTimeStamp();
+    long getSequenceNumber();
+    /**
+     * @return table name
+     */
+    PName getName();
+    PName getSchemaName(); 
+    PName getTableName(); 
+
+    /**
+     * @return the table type
+     */
+    PTableType getType();
+
+    PName getPKName();
+
+    /**
+     * Get the PK columns ordered by position.
+     * @return a list of the PK columns
+     */
+    List<PColumn> getPKColumns();
+
+    /**
+     * Get all columns ordered by position.
+     * @return a list of all columns
+     */
+    List<PColumn> getColumns();
+
+    /**
+     * @return A list of the column families of this table
+     *  ordered by position.
+     */
+    List<PColumnFamily> getColumnFamilies();
+
+    /**
+     * Get the column family with the given name
+     * @param family the column family name
+     * @return the PColumnFamily with the given name
+     * @throws ColumnFamilyNotFoundException if the column family cannot be found
+     */
+    PColumnFamily getColumnFamily(byte[] family) throws ColumnFamilyNotFoundException;
+
+    PColumnFamily getColumnFamily(String family) throws ColumnFamilyNotFoundException;
+
+    /**
+     * Get the column with the given string name.
+     * @param name the column name
+     * @return the PColumn with the given name
+     * @throws ColumnNotFoundException if no column with the given name
+     * can be found
+     * @throws AmbiguousColumnException if multiple columns are found with the given name
+     */
+    PColumn getColumn(String name) throws ColumnNotFoundException, AmbiguousColumnException;
+
+    /**
+     * Get the PK column with the given name.
+     * @param name the column name
+     * @return the PColumn with the given name
+     * @throws ColumnNotFoundException if no PK column with the given name
+     * can be found
+     * @throws ColumnNotFoundException 
+     */
+    PColumn getPKColumn(String name) throws ColumnNotFoundException;
+
+    /**
+     * Creates a new row at the specified timestamp using the key
+     * for the PK values (from {@link #newKey(ImmutableBytesWritable, byte[][])}
+     * and the optional key values specified using values.
+     * @param ts the timestamp that the key value will have when committed
+     * @param key the row key of the key value
+     * @param values the optional key values
+     * @return the new row. Use {@link org.apache.phoenix.schema.PRow#toRowMutations()} to
+     * generate the Row to send to the HBase server.
+     * @throws ConstraintViolationException if row data violates schema
+     * constraint
+     */
+    PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, byte[]... values);
+
+    /**
+     * Creates a new row for the PK values (from {@link #newKey(ImmutableBytesWritable, byte[][])}
+     * and the optional key values specified using values. The timestamp of the key value
+     * will be set by the HBase server.
+     * @param key the row key of the key value
+     * @param values the optional key values
+     * @return the new row. Use {@link org.apache.phoenix.schema.PRow#toRowMutations()} to
+     * generate the row to send to the HBase server.
+     * @throws ConstraintViolationException if row data violates schema
+     * constraint
+     */
+    PRow newRow(KeyValueBuilder builder, ImmutableBytesWritable key, byte[]... values);
+
+    /**
+     * Formulates a row key using the values provided. The values must be in
+     * the same order as {@link #getPKColumns()}.
+     * @param key bytes pointer that will be filled in with the row key
+     * @param values the PK column values
+     * @return the number of values that were used from values to set
+     * the row key
+     */
+    int newKey(ImmutableBytesWritable key, byte[][] values);
+
+    /**
+     * Return the statistics table associated with this PTable.
+     * @return the statistics table.
+     */
+    PTableStats getTableStats();
+
+    RowKeySchema getRowKeySchema();
+
+    /**
+     * Return the number of buckets used by this table for salting. If the table does
+     * not use salting, returns null.
+     * @return number of buckets used by this table for salting, or null if salting is not used.
+     */
+    Integer getBucketNum();
+
+    /**
+     * Return the list of indexes defined on this table.
+     * @return the list of indexes.
+     */
+    List<PTable> getIndexes();
+
+    /**
+     * For a table of index type, return the state of the table.
+     * @return the state of the index.
+     */
+    PIndexState getIndexState();
+
+    /**
+     * Gets the full name of the data table for an index table.
+     * @return the name of the data table that this index is on
+     *  or null if not an index.
+     */
+    PName getParentName();
+    /**
+     * Gets the table name of the data table for an index table.
+     * @return the table name of the data table that this index is
+     * on or null if not an index.
+     */
+    PName getParentTableName();
+    
+    /**
+     * For a view, return the name of table in Phoenix that physically stores data.
+     * Currently a single name, but when views are allowed over multiple tables, will become multi-valued.
+     * @return the name of the physical table storing the data.
+     */
+    public List<PName> getPhysicalNames();
+
+    PName getPhysicalName();
+    boolean isImmutableRows();
+
+    void getIndexMaintainers(ImmutableBytesWritable ptr);
+    IndexMaintainer getIndexMaintainer(PTable dataTable);
+    PName getDefaultFamilyName();
+    
+    boolean isWALDisabled();
+    boolean isMultiTenant();
+
+    ViewType getViewType();
+    String getViewStatement();
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
new file mode 100644
index 0000000..11d6c94
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -0,0 +1,863 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import static org.apache.phoenix.client.KeyValueBuilder.addQuietly;
+import static org.apache.phoenix.client.KeyValueBuilder.deleteQuietly;
+import static org.apache.phoenix.query.QueryConstants.SEPARATOR_BYTE;
+import static org.apache.phoenix.schema.SaltingUtil.SALTING_COLUMN;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.client.KeyValueBuilder;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.RowKeySchema.RowKeySchemaBuilder;
+import org.apache.phoenix.schema.stat.PTableStats;
+import org.apache.phoenix.schema.stat.PTableStatsImpl;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+
+/**
+ * 
+ * Base class for PTable implementors.  Provides abstraction for
+ * storing data in a single column (ColumnLayout.SINGLE) or in
+ * multiple columns (ColumnLayout.MULTI).
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class PTableImpl implements PTable {
+    private static final Integer NO_SALTING = -1;
+    
+    private PName name;
+    private PName schemaName;
+    private PName tableName;
+    private PTableType type;
+    private PIndexState state;
+    private long sequenceNumber;
+    private long timeStamp;
+    // Have MultiMap for String->PColumn (may need family qualifier)
+    private List<PColumn> pkColumns;
+    private List<PColumn> allColumns;
+    private List<PColumnFamily> families;
+    private Map<byte[], PColumnFamily> familyByBytes;
+    private Map<String, PColumnFamily> familyByString;
+    private ListMultimap<String,PColumn> columnsByName;
+    private PName pkName;
+    private Integer bucketNum;
+    // Statistics associated with this table.
+    private PTableStats stats;
+    private RowKeySchema rowKeySchema;
+    // Indexes associated with this table.
+    private List<PTable> indexes;
+    // Data table name that the index is created on.
+    private PName parentName;
+    private PName parentTableName;
+    private List<PName> physicalNames;
+    private boolean isImmutableRows;
+    private IndexMaintainer indexMaintainer;
+    private ImmutableBytesWritable indexMaintainersPtr;
+    private PName defaultFamilyName;
+    private String viewStatement;
+    private boolean disableWAL;
+    private boolean multiTenant;
+    private ViewType viewType;
+    
+    public PTableImpl() {
+    }
+
+    public PTableImpl(PName name) { // For finding table ref
+        this.name = name;
+    }
+
+    public PTableImpl(String schemaName, String tableName, long timestamp, List<PColumnFamily> families) { // For base table of mapped VIEW
+        this.name = PNameFactory.newName(SchemaUtil.getTableName(schemaName, tableName));
+        this.schemaName = PNameFactory.newName(schemaName);
+        this.tableName = PNameFactory.newName(tableName);
+        this.type = PTableType.VIEW;
+        this.viewType = ViewType.MAPPED;
+        this.timeStamp = timestamp;
+        this.pkColumns = this.allColumns = Collections.emptyList();
+        this.rowKeySchema = RowKeySchema.EMPTY_SCHEMA;
+        this.indexes = Collections.emptyList();
+        this.familyByBytes = Maps.newHashMapWithExpectedSize(families.size());
+        this.familyByString = Maps.newHashMapWithExpectedSize(families.size());
+        for (PColumnFamily family : families) {
+            familyByBytes.put(family.getName().getBytes(), family);
+            familyByString.put(family.getName().getString(), family);
+        }
+        this.families = families;
+    }
+
+    public PTableImpl(long timeStamp) { // For delete marker
+        this(timeStamp, false);
+    }
+
+    public PTableImpl(long timeStamp, boolean isIndex) { // For index delete marker
+        if (isIndex) {
+            this.type = PTableType.INDEX;
+            this.state = PIndexState.INACTIVE;
+        } else {
+            this.type = PTableType.TABLE;
+        }
+        this.timeStamp = timeStamp;
+        this.pkColumns = this.allColumns = Collections.emptyList();
+        this.families = Collections.emptyList();
+        this.familyByBytes = Collections.emptyMap();
+        this.familyByString = Collections.emptyMap();
+        this.rowKeySchema = RowKeySchema.EMPTY_SCHEMA;
+        this.indexes = Collections.emptyList();
+    }
+
+    // When cloning table, ignore the salt column as it will be added back in the constructor
+    public static List<PColumn> getColumnsToClone(PTable table) {
+        return table.getBucketNum() == null ? table.getColumns() : table.getColumns().subList(1, table.getColumns().size());
+    }
+    
+    public static PTableImpl makePTable(PTable table, long timeStamp, List<PTable> indexes) throws SQLException {
+        return new PTableImpl(
+                table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp, table.getSequenceNumber() + 1, 
+                table.getPKName(), table.getBucketNum(), getColumnsToClone(table), table.getParentTableName(), indexes, table.isImmutableRows(),
+                table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getViewType());
+    }
+
+    public static PTableImpl makePTable(PTable table, List<PColumn> columns) throws SQLException {
+        return new PTableImpl(
+                table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), table.getTimeStamp(), table.getSequenceNumber(), 
+                table.getPKName(), table.getBucketNum(), columns, table.getParentTableName(), table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(),
+                table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getViewType());
+    }
+
+    public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns) throws SQLException {
+        return new PTableImpl(
+                table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp, sequenceNumber, 
+                table.getPKName(), table.getBucketNum(), columns, table.getParentTableName(), table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(),
+                table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getViewType());
+    }
+
+    public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows) throws SQLException {
+        return new PTableImpl(
+                table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp, sequenceNumber, 
+                table.getPKName(), table.getBucketNum(), columns, table.getParentTableName(), table.getIndexes(), isImmutableRows,
+                table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getViewType());
+    }
+
+    public static PTableImpl makePTable(PTable table, PIndexState state) throws SQLException {
+        return new PTableImpl(
+                table.getSchemaName(), table.getTableName(), table.getType(), state, table.getTimeStamp(), table.getSequenceNumber(), 
+                table.getPKName(), table.getBucketNum(), getColumnsToClone(table), table.getParentTableName(), 
+                table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(),
+                table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getViewType());
+    }
+
+    public static PTableImpl makePTable(PName schemaName, PName tableName, PTableType type, PIndexState state, long timeStamp, long sequenceNumber, PName pkName,
+            Integer bucketNum, List<PColumn> columns, PName dataTableName, List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames,
+            PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, ViewType viewType) throws SQLException {
+        return new PTableImpl(schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataTableName, indexes,
+                isImmutableRows, physicalNames, defaultFamilyName, viewExpression, disableWAL, multiTenant, viewType);
+    }
+
+    private PTableImpl(PName schemaName, PName tableName, PTableType type, PIndexState state, long timeStamp, long sequenceNumber, PName pkName,
+            Integer bucketNum, List<PColumn> columns, PName dataTableName, List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames,
+            PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, ViewType viewType) throws SQLException {
+        init(schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, new PTableStatsImpl(),
+                dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression, disableWAL, multiTenant, viewType);
+    }
+
+    @Override
+    public boolean isMultiTenant() {
+        return multiTenant;
+    }
+    
+    @Override
+    public ViewType getViewType() {
+        return viewType;
+    }
+    
+    private void init(PName schemaName, PName tableName, PTableType type, PIndexState state, long timeStamp, long sequenceNumber, PName pkName,
+            Integer bucketNum, List<PColumn> columns, PTableStats stats, PName parentTableName, List<PTable> indexes, boolean isImmutableRows,
+            List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, ViewType viewType) throws SQLException {
+        if (schemaName == null) {
+            throw new NullPointerException();
+        }
+        this.schemaName = schemaName;
+        this.tableName = tableName;
+        this.name = PNameFactory.newName(SchemaUtil.getTableName(schemaName.getString(), tableName.getString()));
+        this.type = type;
+        this.state = state;
+        this.timeStamp = timeStamp;
+        this.sequenceNumber = sequenceNumber;
+        this.pkName = pkName;
+        this.isImmutableRows = isImmutableRows;
+        this.defaultFamilyName = defaultFamilyName;
+        this.viewStatement = viewExpression;
+        this.disableWAL = disableWAL;
+        this.multiTenant = multiTenant;
+        this.viewType = viewType;
+        List<PColumn> pkColumns;
+        PColumn[] allColumns;
+
+        this.columnsByName = ArrayListMultimap.create(columns.size(), 1);
+        if (bucketNum != null) {
+            // Add salt column to allColumns and pkColumns, but don't add to
+            // columnsByName, since it should not be addressable via name.
+            allColumns = new PColumn[columns.size()+1];
+            allColumns[SALTING_COLUMN.getPosition()] = SALTING_COLUMN;
+            pkColumns = Lists.newArrayListWithExpectedSize(columns.size()+1);
+            pkColumns.add(SALTING_COLUMN);
+        } else {
+            allColumns = new PColumn[columns.size()];
+            pkColumns = Lists.newArrayListWithExpectedSize(columns.size());
+        }
+        for (int i = 0; i < columns.size(); i++) {
+            PColumn column = columns.get(i);
+            allColumns[column.getPosition()] = column;
+            PName familyName = column.getFamilyName();
+            if (familyName == null) {
+                pkColumns.add(column);
+            }
+            String columnName = column.getName().getString();
+            if (columnsByName.put(columnName, column)) {
+                int count = 0;
+                for (PColumn dupColumn : columnsByName.get(columnName)) {
+                    if (Objects.equal(familyName, dupColumn.getFamilyName())) {
+                        count++;
+                        if (count > 1) {
+                            throw new ColumnAlreadyExistsException(null, name.getString(), columnName);
+                        }
+                    }
+                }
+            }
+        }
+        this.bucketNum = bucketNum;
+        this.pkColumns = ImmutableList.copyOf(pkColumns);
+        this.allColumns = ImmutableList.copyOf(allColumns);
+        
+        RowKeySchemaBuilder builder = new RowKeySchemaBuilder(pkColumns.size());
+        // Two pass so that column order in column families matches overall column order
+        // and to ensure that column family order is constant
+        int maxExpectedSize = allColumns.length - pkColumns.size();
+        // Maintain iteration order so that column families are ordered as they are listed
+        Map<PName, List<PColumn>> familyMap = Maps.newLinkedHashMap();
+        for (PColumn column : allColumns) {
+            PName familyName = column.getFamilyName();
+            if (familyName == null) {
+                builder.addField(column, column.isNullable(), column.getColumnModifier());
+            } else {
+                List<PColumn> columnsInFamily = familyMap.get(familyName);
+                if (columnsInFamily == null) {
+                    columnsInFamily = Lists.newArrayListWithExpectedSize(maxExpectedSize);
+                    familyMap.put(familyName, columnsInFamily);
+                }
+                columnsInFamily.add(column);
+            }
+        }
+        
+        this.rowKeySchema = builder.build();
+        Iterator<Map.Entry<PName,List<PColumn>>> iterator = familyMap.entrySet().iterator();
+        PColumnFamily[] families = new PColumnFamily[familyMap.size()];
+        ImmutableMap.Builder<String, PColumnFamily> familyByString = ImmutableMap.builder();
+        ImmutableSortedMap.Builder<byte[], PColumnFamily> familyByBytes = ImmutableSortedMap.orderedBy(Bytes.BYTES_COMPARATOR);
+        for (int i = 0; i < families.length; i++) {
+            Map.Entry<PName,List<PColumn>> entry = iterator.next();
+            PColumnFamily family = new PColumnFamilyImpl(entry.getKey(), entry.getValue());
+            families[i] = family;
+            familyByString.put(family.getName().getString(), family);
+            familyByBytes.put(family.getName().getBytes(), family);
+        }
+        this.families = ImmutableList.copyOf(families);
+        this.familyByBytes = familyByBytes.build();
+        this.familyByString = familyByString.build();
+        this.stats = stats;
+        this.indexes = indexes;
+        this.parentTableName = parentTableName;
+        this.parentName = parentTableName == null ? null : PNameFactory.newName(SchemaUtil.getTableName(schemaName.getString(), parentTableName.getString()));
+        this.physicalNames = physicalNames == null ? ImmutableList.<PName>of() : ImmutableList.copyOf(physicalNames);
+    }
+
+    @Override
+    public boolean isImmutableRows() {
+        return isImmutableRows;
+    }
+    
+    @Override
+    public String toString() {
+        return name.getString();
+    }
+
+    @Override
+    public List<PColumn> getPKColumns() {
+        return pkColumns;
+    }
+
+    @Override
+    public final PName getName() {
+        return name;
+    }
+
+    @Override
+    public final PName getSchemaName() {
+        return schemaName;
+    }
+
+    @Override
+    public final PName getTableName() {
+        return tableName;
+    }
+
+    @Override
+    public final PTableType getType() {
+        return type;
+    }
+
+    @Override
+    public final List<PColumnFamily> getColumnFamilies() {
+        return families;
+    }
+
+    @Override
+    public int newKey(ImmutableBytesWritable key, byte[][] values) {
+        int nValues = values.length;
+        while (nValues > 0 && (values[nValues-1] == null || values[nValues-1].length == 0)) {
+            nValues--;
+        }
+        int i = 0;
+        TrustedByteArrayOutputStream os = new TrustedByteArrayOutputStream(SchemaUtil.estimateKeyLength(this));
+        try {
+            Integer bucketNum = this.getBucketNum();
+            if (bucketNum != null) {
+                // Write place holder for salt byte
+                i++;
+                os.write(QueryConstants.SEPARATOR_BYTE_ARRAY);
+            }
+            List<PColumn> columns = getPKColumns();
+            int nColumns = columns.size();
+            PDataType type = null;
+            while (i < nValues && i < nColumns) {
+                // Separate variable length column values in key with zero byte
+                if (type != null && !type.isFixedWidth()) {
+                    os.write(SEPARATOR_BYTE);
+                }
+                PColumn column = columns.get(i);
+                type = column.getDataType();
+                // This will throw if the value is null and the type doesn't allow null
+                byte[] byteValue = values[i++];
+                if (byteValue == null) {
+                    byteValue = ByteUtil.EMPTY_BYTE_ARRAY;
+                }
+                // An empty byte array return value means null. Do this,
+                // since a type may have muliple representations of null.
+                // For example, VARCHAR treats both null and an empty string
+                // as null. This way we don't need to leak that part of the
+                // implementation outside of PDataType by checking the value
+                // here.
+                if (byteValue.length == 0 && !column.isNullable()) { 
+                    throw new ConstraintViolationException(name.getString() + "." + column.getName().getString() + " may not be null");
+                }
+                Integer	byteSize = column.getByteSize();
+                if (byteSize != null && type.isFixedWidth() && byteValue.length <= byteSize) {
+                    byteValue = StringUtil.padChar(byteValue, byteSize);
+                } else if (byteSize != null && byteValue.length > byteSize) {
+                    throw new ConstraintViolationException(name.getString() + "." + column.getName().getString() + " may not exceed " + byteSize + " bytes (" + SchemaUtil.toString(type, byteValue) + ")");
+                }
+                os.write(byteValue, 0, byteValue.length);
+            }
+            // If some non null pk values aren't set, then throw
+            if (i < nColumns) {
+                PColumn column = columns.get(i);
+                type = column.getDataType();
+                if (type.isFixedWidth() || !column.isNullable()) {
+                    throw new ConstraintViolationException(name.getString() + "." + column.getName().getString() + " may not be null");
+                }
+            }
+            byte[] buf = os.getBuffer();
+            int size = os.size();
+            if (bucketNum != null) {
+                buf[0] = SaltingUtil.getSaltingByte(buf, 1, size-1, bucketNum);
+            }
+            key.set(buf,0,size);
+            return i;
+        } finally {
+            try {
+                os.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e); // Impossible
+            }
+        }
+    }
+
+    private PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, int i, byte[]... values) {
+        PRow row = new PRowImpl(builder, key, ts, getBucketNum());
+        if (i < values.length) {
+            for (PColumnFamily family : getColumnFamilies()) {
+                for (PColumn column : family.getColumns()) {
+                    row.setValue(column, values[i++]);
+                    if (i == values.length)
+                        return row;
+                }
+            }
+        }
+        return row;
+    }
+
+    @Override
+    public PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key,
+            byte[]... values) {
+        return newRow(builder, ts, key, 0, values);
+    }
+
+    @Override
+    public PRow newRow(KeyValueBuilder builder, ImmutableBytesWritable key, byte[]... values) {
+        return newRow(builder, HConstants.LATEST_TIMESTAMP, key, values);
+    }
+
+    @Override
+    public PColumn getColumn(String name) throws ColumnNotFoundException, AmbiguousColumnException {
+        List<PColumn> columns = columnsByName.get(name);
+        int size = columns.size();
+        if (size == 0) {
+            throw new ColumnNotFoundException(name);
+        }
+        if (size > 1) {
+            for (PColumn column : columns) {
+                if (QueryConstants.DEFAULT_COLUMN_FAMILY.equals(column.getFamilyName().getString())) {
+                    // Allow ambiguity with default column, since a user would not know how to prefix it.
+                    return column;
+                }
+            }
+            throw new AmbiguousColumnException(name);
+        }
+        return columns.get(0);
+    }
+
+    /**
+     * 
+     * PRow implementation for ColumnLayout.MULTI mode which stores column
+     * values across multiple hbase columns.
+     *
+     * @author jtaylor
+     * @since 0.1
+     */
+    private class PRowImpl implements PRow {
+        private final byte[] key;
+        private final ImmutableBytesWritable keyPtr;
+        // default to the generic builder, and only override when we know on the client
+        private final KeyValueBuilder kvBuilder;
+
+        private Put setValues;
+        private Delete unsetValues;
+        private Delete deleteRow;
+        private final long ts;
+
+        public PRowImpl(KeyValueBuilder kvBuilder, ImmutableBytesWritable key, long ts, Integer bucketNum) {
+            this.kvBuilder = kvBuilder;
+            this.ts = ts;
+            if (bucketNum != null) {
+                this.key = SaltingUtil.getSaltedKey(key, bucketNum);
+                this.keyPtr = new ImmutableBytesPtr(this.key);
+            } else {
+                this.keyPtr =  new ImmutableBytesPtr(key);
+                this.key = ByteUtil.copyKeyBytesIfNecessary(key);
+            }
+
+            newMutations();
+        }
+        
+        @SuppressWarnings("deprecation")
+        private void newMutations() {
+            this.setValues = new Put(this.key);
+            this.unsetValues = new Delete(this.key);
+            this.setValues.setWriteToWAL(!isWALDisabled());
+            this.unsetValues.setWriteToWAL(!isWALDisabled());
+       }
+
+        @Override
+        public List<Mutation> toRowMutations() {
+            List<Mutation> mutations = new ArrayList<Mutation>(3);
+            if (deleteRow != null) {
+                // Include only deleteRow mutation if present because it takes precedence over all others
+                mutations.add(deleteRow);
+            } else {
+                // Because we cannot enforce a not null constraint on a KV column (since we don't know if the row exists when
+                // we upsert it), se instead add a KV that is always emtpy. This allows us to imitate SQL semantics given the
+                // way HBase works.
+                addQuietly(setValues, kvBuilder, kvBuilder.buildPut(keyPtr,
+                    SchemaUtil.getEmptyColumnFamilyPtr(getColumnFamilies()),
+                    QueryConstants.EMPTY_COLUMN_BYTES_PTR, ts, ByteUtil.EMPTY_BYTE_ARRAY_PTR));
+                mutations.add(setValues);
+                if (!unsetValues.isEmpty()) {
+                    mutations.add(unsetValues);
+                }
+            }
+            return mutations;
+        }
+
+        private void removeIfPresent(Mutation m, byte[] family, byte[] qualifier) {
+            Map<byte[],List<KeyValue>> familyMap = m.getFamilyMap();
+            List<KeyValue> kvs = familyMap.get(family);
+            if (kvs != null) {
+                Iterator<KeyValue> iterator = kvs.iterator();
+                while (iterator.hasNext()) {
+                    KeyValue kv = iterator.next();
+                    if (Bytes.compareTo(kv.getQualifier(), qualifier) == 0) {
+                        iterator.remove();
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void setValue(PColumn column, Object value) {
+            byte[] byteValue = value == null ? ByteUtil.EMPTY_BYTE_ARRAY : column.getDataType().toBytes(value);
+            setValue(column, byteValue);
+        }
+
+        @Override
+        public void setValue(PColumn column, byte[] byteValue) {
+            deleteRow = null;
+            byte[] family = column.getFamilyName().getBytes();
+            byte[] qualifier = column.getName().getBytes();
+            PDataType type = column.getDataType();
+            // Check null, since some types have no byte representation for null
+            if (byteValue == null || byteValue.length == 0) {
+                if (!column.isNullable()) { 
+                    throw new ConstraintViolationException(name.getString() + "." + column.getName().getString() + " may not be null");
+                }
+                removeIfPresent(setValues, family, qualifier);
+                deleteQuietly(unsetValues, kvBuilder, kvBuilder.buildDeleteColumns(keyPtr, column
+                        .getFamilyName().getBytesPtr(), column.getName().getBytesPtr(), ts));
+            } else {
+            	Integer	byteSize = column.getByteSize();
+				if (byteSize != null && type.isFixedWidth()
+						&& byteValue.length <= byteSize) { 
+                    byteValue = StringUtil.padChar(byteValue, byteSize);
+                } else if (byteSize != null && byteValue.length > byteSize) {
+                    throw new ConstraintViolationException(name.getString() + "." + column.getName().getString() + " may not exceed " + byteSize + " bytes (" + type.toObject(byteValue) + ")");
+                }
+                removeIfPresent(unsetValues, family, qualifier);
+                addQuietly(setValues, kvBuilder, kvBuilder.buildPut(keyPtr, column.getFamilyName()
+                        .getBytesPtr(),
+                        column.getName().getBytesPtr(), ts, new ImmutableBytesPtr(byteValue)));
+            }
+        }
+        
+        @SuppressWarnings("deprecation")
+        @Override
+        public void delete() {
+            newMutations();
+            // FIXME: the version of the Delete constructor without the lock args was introduced
+            // in 0.94.4, thus if we try to use it here we can no longer use the 0.94.2 version
+            // of the client.
+            Delete delete = new Delete(key,ts,null);
+            deleteRow = delete;
+            deleteRow.setWriteToWAL(!isWALDisabled());
+        }
+    }
+
+    @Override
+    public PColumnFamily getColumnFamily(String familyName) throws ColumnFamilyNotFoundException {
+        PColumnFamily family = familyByString.get(familyName);
+        if (family == null) {
+            throw new ColumnFamilyNotFoundException(familyName);
+        }
+        return family;
+    }
+
+    @Override
+    public PColumnFamily getColumnFamily(byte[] familyBytes) throws ColumnFamilyNotFoundException {
+        PColumnFamily family = familyByBytes.get(familyBytes);
+        if (family == null) {
+            String familyName = Bytes.toString(familyBytes);
+            throw new ColumnFamilyNotFoundException(familyName);
+        }
+        return family;
+    }
+
+    @Override
+    public List<PColumn> getColumns() {
+        return allColumns;
+    }
+
+    @Override
+    public long getSequenceNumber() {
+        return sequenceNumber;
+    }
+
+    @Override
+    public long getTimeStamp() {
+        return timeStamp;
+    }
+
+    @Override
+    public PTableStats getTableStats() {
+        return stats;
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        byte[] schemaNameBytes = Bytes.readByteArray(input);
+        byte[] tableNameBytes = Bytes.readByteArray(input);
+        PName schemaName = PNameFactory.newName(schemaNameBytes);
+        PName tableName = PNameFactory.newName(tableNameBytes);
+        PTableType tableType = PTableType.values()[WritableUtils.readVInt(input)];
+        PIndexState indexState = null;
+        if (tableType == PTableType.INDEX) {
+            int ordinal = WritableUtils.readVInt(input);
+            if (ordinal >= 0) {
+                indexState = PIndexState.values()[ordinal];
+            }
+        }
+        long sequenceNumber = WritableUtils.readVLong(input);
+        long timeStamp = input.readLong();
+        byte[] pkNameBytes = Bytes.readByteArray(input);
+        PName pkName = pkNameBytes.length == 0 ? null : PNameFactory.newName(pkNameBytes);
+        Integer bucketNum = WritableUtils.readVInt(input);
+        int nColumns = WritableUtils.readVInt(input);
+        List<PColumn> columns = Lists.newArrayListWithExpectedSize(nColumns);
+        for (int i = 0; i < nColumns; i++) {
+            PColumn column = new PColumnImpl();
+            column.readFields(input);
+            columns.add(column);
+        }
+        int nIndexes = WritableUtils.readVInt(input);
+        List<PTable> indexes = Lists.newArrayListWithExpectedSize(nIndexes);
+        for (int i = 0; i < nIndexes; i++) {
+            PTable index = new PTableImpl();
+            index.readFields(input);
+            indexes.add(index);
+        }
+        boolean isImmutableRows = input.readBoolean();
+        Map<String, byte[][]> guidePosts = new HashMap<String, byte[][]>();
+        int size = WritableUtils.readVInt(input);
+        for (int i=0; i<size; i++) {
+            String key = WritableUtils.readString(input);
+            int valueSize = WritableUtils.readVInt(input);
+            byte[][] value = new byte[valueSize][];
+            for (int j=0; j<valueSize; j++) {
+                value[j] = Bytes.readByteArray(input);
+            }
+            guidePosts.put(key, value);
+        }
+        byte[] dataTableNameBytes = Bytes.readByteArray(input);
+        PName dataTableName = dataTableNameBytes.length == 0 ? null : PNameFactory.newName(dataTableNameBytes);
+        byte[] defaultFamilyNameBytes = Bytes.readByteArray(input);
+        PName defaultFamilyName = defaultFamilyNameBytes.length == 0 ? null : PNameFactory.newName(defaultFamilyNameBytes);
+        boolean disableWAL = input.readBoolean();
+        boolean multiTenant = input.readBoolean();
+        ViewType viewType = null;
+        String viewStatement = null;
+        List<PName> physicalNames = Collections.emptyList();
+        if (tableType == PTableType.VIEW) {
+            viewType = ViewType.fromSerializedValue(input.readByte());
+            byte[] viewStatementBytes = Bytes.readByteArray(input);
+            viewStatement = viewStatementBytes.length == 0 ? null : (String)PDataType.VARCHAR.toObject(viewStatementBytes);
+            int nPhysicalNames = WritableUtils.readVInt(input);
+            physicalNames = Lists.newArrayListWithExpectedSize(nPhysicalNames);
+            for (int i = 0; i < nPhysicalNames; i++) {
+                byte[] physicalNameBytes = Bytes.readByteArray(input);
+                physicalNames.add(PNameFactory.newName(physicalNameBytes));
+            }
+        }
+        PTableStats stats = new PTableStatsImpl(guidePosts);
+        try {
+            init(schemaName, tableName, tableType, indexState, timeStamp, sequenceNumber, pkName,
+                 bucketNum.equals(NO_SALTING) ? null : bucketNum, columns, stats, dataTableName,
+                 indexes, isImmutableRows, physicalNames, defaultFamilyName,
+                 viewStatement, disableWAL, multiTenant, viewType);
+        } catch (SQLException e) {
+            throw new RuntimeException(e); // Impossible
+        }
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        Bytes.writeByteArray(output, schemaName.getBytes());
+        Bytes.writeByteArray(output, tableName.getBytes());
+        WritableUtils.writeVInt(output, type.ordinal());
+        if (type == PTableType.INDEX) {
+            WritableUtils.writeVInt(output, state == null ? -1 : state.ordinal());
+        }
+        WritableUtils.writeVLong(output, sequenceNumber);
+        output.writeLong(timeStamp);
+        Bytes.writeByteArray(output, pkName == null ? ByteUtil.EMPTY_BYTE_ARRAY : pkName.getBytes());
+        int offset = 0, nColumns = allColumns.size();
+        if (bucketNum == null) {
+            WritableUtils.writeVInt(output, NO_SALTING);
+        } else {
+            offset = 1;
+            nColumns--;
+            WritableUtils.writeVInt(output, bucketNum);
+        }
+        WritableUtils.writeVInt(output, nColumns);
+        for (int i = offset; i < allColumns.size(); i++) {
+            PColumn column = allColumns.get(i);
+            column.write(output);
+        }
+        WritableUtils.writeVInt(output, indexes.size());
+        for (PTable index: indexes) {
+            index.write(output);
+        }
+        output.writeBoolean(isImmutableRows);
+        stats.write(output);
+        Bytes.writeByteArray(output, parentTableName == null ? ByteUtil.EMPTY_BYTE_ARRAY : parentTableName.getBytes());
+        Bytes.writeByteArray(output, defaultFamilyName == null ? ByteUtil.EMPTY_BYTE_ARRAY : defaultFamilyName.getBytes());
+        output.writeBoolean(disableWAL);
+        output.writeBoolean(multiTenant);
+        if (type == PTableType.VIEW) {
+            output.writeByte(viewType.getSerializedValue());
+            Bytes.writeByteArray(output, viewStatement == null ? ByteUtil.EMPTY_BYTE_ARRAY : PDataType.VARCHAR.toBytes(viewStatement));
+            WritableUtils.writeVInt(output, physicalNames.size());
+            for (int i = 0; i < physicalNames.size(); i++) {
+                Bytes.writeByteArray(output, physicalNames.get(i).getBytes());
+            }
+        }
+    }
+
+    @Override
+    public PColumn getPKColumn(String name) throws ColumnNotFoundException {
+        List<PColumn> columns = columnsByName.get(name);
+        int size = columns.size();
+        if (size == 0) {
+            throw new ColumnNotFoundException(name);
+        }
+        if (size > 1) {
+            do {
+                PColumn column = columns.get(--size);
+                if (column.getFamilyName() == null) {
+                    return column;
+                }
+            } while (size > 0);
+            throw new ColumnNotFoundException(name);
+        }
+        return columns.get(0);
+    }
+
+    @Override
+    public PName getPKName() {
+        return pkName;
+    }
+
+    @Override
+    public RowKeySchema getRowKeySchema() {
+        return rowKeySchema;
+    }
+
+    @Override
+    public Integer getBucketNum() {
+        return bucketNum;
+    }
+
+    @Override
+    public List<PTable> getIndexes() {
+        return indexes;
+    }
+
+    @Override
+    public PIndexState getIndexState() {
+        return state;
+    }
+
+    @Override
+    public PName getParentTableName() {
+        return parentTableName;
+    }
+
+    @Override
+    public PName getParentName() {
+        return parentName;
+    }
+
+    @Override
+    public synchronized IndexMaintainer getIndexMaintainer(PTable dataTable) {
+        if (indexMaintainer == null) {
+            indexMaintainer = IndexMaintainer.create(dataTable, this);
+        }
+        return indexMaintainer;
+    }
+
+    @Override
+    public synchronized void getIndexMaintainers(ImmutableBytesWritable ptr) {
+        if (indexMaintainersPtr == null) {
+            indexMaintainersPtr = new ImmutableBytesWritable();
+            if (indexes.isEmpty()) {
+                indexMaintainersPtr.set(ByteUtil.EMPTY_BYTE_ARRAY);
+            } else {
+                IndexMaintainer.serialize(this, indexMaintainersPtr);
+            }
+        }
+        ptr.set(indexMaintainersPtr.get(), indexMaintainersPtr.getOffset(), indexMaintainersPtr.getLength());
+    }
+    
+    @Override
+    public PName getPhysicalName() {
+        return physicalNames.isEmpty() ? getName() : physicalNames.get(0);
+    }
+    
+    @Override
+    public List<PName> getPhysicalNames() {
+        return physicalNames;
+    }
+
+    @Override
+    public PName getDefaultFamilyName() {
+        return defaultFamilyName;
+    }
+
+    @Override
+    public String getViewStatement() {
+        return viewStatement;
+    }
+
+    @Override
+    public boolean isWALDisabled() {
+        return disableWAL;
+    }
+}
\ No newline at end of file