You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2017/02/27 06:14:37 UTC

[04/17] phoenix git commit: PHOENIX-1598 Encode column names to save space and improve performance

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 01e8afe..91a41a3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -17,13 +17,32 @@
  */
 package org.apache.phoenix.schema;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
+import static org.apache.phoenix.util.EncodedColumnsUtil.isReservedColumnQualifier;
+
+import java.io.DataOutputStream;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import javax.annotation.Nullable;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.types.PArrayDataType;
+import org.apache.phoenix.schema.types.PArrayDataTypeDecoder;
+import org.apache.phoenix.schema.types.PArrayDataTypeEncoder;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+
+import com.google.common.annotations.VisibleForTesting;
 
 
 /**
@@ -129,7 +148,7 @@ public interface PTable extends PMetaDataEntity {
          * Link from a view to its parent table
          */
         PARENT_TABLE((byte)3);
-
+        
         private final byte[] byteValue;
         private final byte serializedValue;
 
@@ -153,6 +172,318 @@ public interface PTable extends PMetaDataEntity {
             return LinkType.values()[serializedValue-1];
         }
     }
+    
+    public enum ImmutableStorageScheme implements ColumnValueEncoderDecoderSupplier {
+        ONE_CELL_PER_COLUMN((byte)1) {
+            @Override
+            public ColumnValueEncoder getEncoder(int numElements) {
+                throw new UnsupportedOperationException();
+            }
+            
+            @Override
+            public ColumnValueDecoder getDecoder() {
+                throw new UnsupportedOperationException();
+            }
+        },
+        // stores a single cell per column family that contains all serialized column values
+        SINGLE_CELL_ARRAY_WITH_OFFSETS((byte)2) {
+            @Override
+            public ColumnValueEncoder getEncoder(int numElements) {
+                PDataType type = PVarbinary.INSTANCE;
+                int estimatedSize = PArrayDataType.estimateSize(numElements, type);
+                TrustedByteArrayOutputStream byteStream = new TrustedByteArrayOutputStream(estimatedSize);
+                DataOutputStream oStream = new DataOutputStream(byteStream);
+                return new PArrayDataTypeEncoder(byteStream, oStream, numElements, type, SortOrder.ASC, false, PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION);
+            }
+            
+            @Override
+            public ColumnValueDecoder getDecoder() {
+                return new PArrayDataTypeDecoder();
+            }
+        };
+
+        private final byte serializedValue;
+        
+        private ImmutableStorageScheme(byte serializedValue) {
+            this.serializedValue = serializedValue;
+        }
+
+        public byte getSerializedMetadataValue() {
+            return this.serializedValue;
+        }
+
+        public static ImmutableStorageScheme fromSerializedValue(byte serializedValue) {
+            if (serializedValue < 1 || serializedValue > ImmutableStorageScheme.values().length) {
+                return null;
+            }
+            return ImmutableStorageScheme.values()[serializedValue-1];
+        }
+
+    }
+    
+    interface ColumnValueEncoderDecoderSupplier {
+        ColumnValueEncoder getEncoder(int numElements);
+        ColumnValueDecoder getDecoder();
+    }
+    
+    public enum QualifierEncodingScheme implements QualifierEncoderDecoder {
+        NON_ENCODED_QUALIFIERS((byte)0, null) {
+            @Override
+            public byte[] encode(int value) {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public int decode(byte[] bytes) {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public int decode(byte[] bytes, int offset, int length) {
+                throw new UnsupportedOperationException();
+            }
+            
+            @Override
+            public String toString() {
+                return name();
+            }
+        },
+        ONE_BYTE_QUALIFIERS((byte)1, 255) {
+            private final int c = Math.abs(Byte.MIN_VALUE);
+            
+            @Override
+            public byte[] encode(int value) {
+                if (isReservedColumnQualifier(value)) {
+                    return FOUR_BYTE_QUALIFIERS.encode(value);
+                }
+                if (value < 0 || value > maxQualifier) {
+                    throw new QualifierOutOfRangeException(0, maxQualifier);
+                }
+                return new byte[]{(byte)(value - c)};
+            }
+
+            @Override
+            public int decode(byte[] bytes) {
+                if (bytes.length == 4) {
+                    return getReservedQualifier(bytes);
+                }
+                if (bytes.length != 1) {
+                    throw new InvalidQualifierBytesException(1, bytes.length);
+                }
+                return bytes[0] + c;
+            }
+
+            @Override
+            public int decode(byte[] bytes, int offset, int length) {
+                if (length == 4) {
+                    return getReservedQualifier(bytes, offset, length);
+                }
+                if (length != 1) {
+                    throw new InvalidQualifierBytesException(1, length);
+                }
+                return bytes[offset] + c;
+            }
+            
+            @Override
+            public String toString() {
+                return name();
+            }
+        },
+        TWO_BYTE_QUALIFIERS((byte)2, 65535) {
+            private final int c = Math.abs(Short.MIN_VALUE);
+            
+            @Override
+            public byte[] encode(int value) {
+                if (isReservedColumnQualifier(value)) {
+                    return FOUR_BYTE_QUALIFIERS.encode(value);
+                }
+                if (value < 0 || value > maxQualifier) {
+                    throw new QualifierOutOfRangeException(0, maxQualifier);
+                }
+                return Bytes.toBytes((short)(value - c));
+            }
+
+            @Override
+            public int decode(byte[] bytes) {
+                if (bytes.length == 4) {
+                    return getReservedQualifier(bytes);
+                }
+                if (bytes.length != 2) {
+                    throw new InvalidQualifierBytesException(2, bytes.length);
+                }
+                return Bytes.toShort(bytes) + c;
+            }
+
+            @Override
+            public int decode(byte[] bytes, int offset, int length) {
+                if (length == 4) {
+                    return getReservedQualifier(bytes, offset, length);
+                }
+                if (length != 2) {
+                    throw new InvalidQualifierBytesException(2, length);
+                }
+                return Bytes.toShort(bytes, offset, length) + c;
+            }
+            
+            @Override
+            public String toString() {
+                return name();
+            }
+        },
+        THREE_BYTE_QUALIFIERS((byte)3, 16777215) {
+            @Override
+            public byte[] encode(int value) {
+                if (isReservedColumnQualifier(value)) {
+                    return FOUR_BYTE_QUALIFIERS.encode(value);
+                }
+                if (value < 0 || value > maxQualifier) {
+                    throw new QualifierOutOfRangeException(0, maxQualifier);
+                }
+                byte[] arr = Bytes.toBytes(value);
+                return new byte[]{arr[1], arr[2], arr[3]};
+            }
+
+            @Override
+            public int decode(byte[] bytes) {
+                if (bytes.length == 4) {
+                    return getReservedQualifier(bytes);
+                }
+                if (bytes.length != 3) {
+                    throw new InvalidQualifierBytesException(2, bytes.length);
+                }
+                byte[] toReturn = new byte[4];
+                toReturn[1] = bytes[0];
+                toReturn[2] = bytes[1];
+                toReturn[3] = bytes[2];
+                return Bytes.toInt(toReturn);
+            }
+
+            @Override
+            public int decode(byte[] bytes, int offset, int length) {
+                if (length == 4) {
+                    return getReservedQualifier(bytes, offset, length);
+                }
+                if (length != 3) {
+                    throw new InvalidQualifierBytesException(3, length);
+                }
+                byte[] toReturn = new byte[4];
+                toReturn[1] = bytes[offset];
+                toReturn[2] = bytes[offset + 1];
+                toReturn[3] = bytes[offset + 2];
+                return Bytes.toInt(toReturn);
+            }
+            
+            @Override
+            public String toString() {
+                return name();
+            }
+        },
+        FOUR_BYTE_QUALIFIERS((byte)4, Integer.MAX_VALUE) {
+            @Override
+            public byte[] encode(int value) {
+                if (value < 0) {
+                    throw new QualifierOutOfRangeException(0, maxQualifier);
+                }
+                return Bytes.toBytes(value);
+            }
+
+            @Override
+            public int decode(byte[] bytes) {
+                if (bytes.length != 4) {
+                    throw new InvalidQualifierBytesException(4, bytes.length);
+                }
+                return Bytes.toInt(bytes);
+            }
+
+            @Override
+            public int decode(byte[] bytes, int offset, int length) {
+                if (length != 4) {
+                    throw new InvalidQualifierBytesException(4, length);
+                }
+                return Bytes.toInt(bytes, offset, length);
+            }
+            
+            @Override
+            public String toString() {
+                return name();
+            }
+        };
+        
+        final byte metadataValue;
+        final Integer maxQualifier;
+        
+        public byte getSerializedMetadataValue() {
+            return this.metadataValue;
+        }
+
+        public static QualifierEncodingScheme fromSerializedValue(byte serializedValue) {
+            if (serializedValue < 0 || serializedValue >= QualifierEncodingScheme.values().length) {
+                return null;
+            }
+            return QualifierEncodingScheme.values()[serializedValue];
+        }
+        
+        @Override
+        public Integer getMaxQualifier() {
+            return maxQualifier;
+        }
+
+        private QualifierEncodingScheme(byte serializedMetadataValue, Integer maxQualifier) {
+            this.metadataValue = serializedMetadataValue;
+            this.maxQualifier = maxQualifier;
+        }
+        
+        @VisibleForTesting
+        public static class QualifierOutOfRangeException extends RuntimeException {
+            public QualifierOutOfRangeException(int minQualifier, int maxQualifier) {
+                super("Qualifier out of range (" + minQualifier + ", " + maxQualifier + ")"); 
+            }
+        }
+        
+        @VisibleForTesting
+        public static class InvalidQualifierBytesException extends RuntimeException {
+            public InvalidQualifierBytesException(int expectedLength, int actualLength) {
+                super("Invalid number of qualifier bytes. Expected length: " + expectedLength + ". Actual: " + actualLength);
+            }
+        }
+
+        /**
+         * We generate our column qualifiers in the reserved range 0-10 using the FOUR_BYTE_QUALIFIERS
+         * encoding. When adding Cells corresponding to the reserved qualifiers to the
+         * EncodedColumnQualifierCells list, we need to make sure that we use the FOUR_BYTE_QUALIFIERS
+         * scheme to decode the correct int value.
+         */
+        private static int getReservedQualifier(byte[] bytes) {
+            checkArgument(bytes.length == 4);
+            int number = FOUR_BYTE_QUALIFIERS.decode(bytes);
+            if (!isReservedColumnQualifier(number)) {
+                throw new InvalidQualifierBytesException(4, bytes.length);
+            }
+            return number;
+        }
+
+        /**
+         * We generate our column qualifiers in the reserved range 0-10 using the FOUR_BYTE_QUALIFIERS
+         * encoding. When adding Cells corresponding to the reserved qualifiers to the
+         * EncodedColumnQualifierCells list, we need to make sure that we use the FOUR_BYTE_QUALIFIERS
+         * scheme to decode the correct int value.
+         */
+        private static int getReservedQualifier(byte[] bytes, int offset, int length) {
+            checkArgument(length == 4);
+            int number = FOUR_BYTE_QUALIFIERS.decode(bytes, offset, length);
+            if (!isReservedColumnQualifier(number)) {
+                throw new InvalidQualifierBytesException(4, length);
+            }
+            return number;
+        }
+    }
+    
+    interface QualifierEncoderDecoder {
+        byte[] encode(int value);
+        int decode(byte[] bytes);
+        int decode(byte[] bytes, int offset, int length);
+        Integer getMaxQualifier();
+    }
 
     long getTimeStamp();
     long getSequenceNumber();
@@ -208,7 +539,16 @@ public interface PTable extends PMetaDataEntity {
      * can be found
      * @throws AmbiguousColumnException if multiple columns are found with the given name
      */
-    PColumn getColumn(String name) throws ColumnNotFoundException, AmbiguousColumnException;
+    PColumn getColumnForColumnName(String name) throws ColumnNotFoundException, AmbiguousColumnException;
+    
+    /**
+     * Get the column with the given column qualifier.
+     * @param column qualifier bytes
+     * @return the PColumn with the given column qualifier
+     * @throws ColumnNotFoundException if no column with the given column qualifier can be found
+     * @throws AmbiguousColumnException if multiple columns are found with the given column qualifier
+     */
+    PColumn getColumnForColumnQualifier(byte[] cf, byte[] cq) throws ColumnNotFoundException, AmbiguousColumnException; 
     
     /**
      * Get the PK column with the given name.
@@ -345,7 +685,6 @@ public interface PTable extends PMetaDataEntity {
      */
     int getRowTimestampColPos();
     long getUpdateCacheFrequency();
-
     boolean isNamespaceMapped();
     
     /**
@@ -359,4 +698,94 @@ public interface PTable extends PMetaDataEntity {
      * you are also not allowed to delete the table  
      */
     boolean isAppendOnlySchema();
+    ImmutableStorageScheme getImmutableStorageScheme();
+    QualifierEncodingScheme getEncodingScheme();
+    EncodedCQCounter getEncodedCQCounter();
+    
+    /**
+     * Class to help track encoded column qualifier counters per column family.
+     */
+    public class EncodedCQCounter {
+        
+        private final Map<String, Integer> familyCounters = new HashMap<>();
+        
+        /**
+         * Copy constructor
+         * @param counterToCopy
+         * @return copy of the passed counter
+         */
+        public static EncodedCQCounter copy(EncodedCQCounter counterToCopy) {
+            EncodedCQCounter cqCounter = new EncodedCQCounter();
+            for (Entry<String, Integer> e : counterToCopy.values().entrySet()) {
+                cqCounter.setValue(e.getKey(), e.getValue());
+            }
+            return cqCounter;
+        }
+        
+        public static final EncodedCQCounter NULL_COUNTER = new EncodedCQCounter() {
+
+            @Override
+            public Integer getNextQualifier(String columnFamily) {
+                return null;
+            }
+
+            @Override
+            public void setValue(String columnFamily, Integer value) {
+            }
+
+            @Override
+            public boolean increment(String columnFamily) {
+                return false;
+            }
+
+            @Override
+            public Map<String, Integer> values() {
+                return Collections.emptyMap();
+            }
+
+        };
+        
+        /**
+         * Get the next qualifier to be used for the column family.
+         * This method also ends up initializing the counter if the
+         * column family already doesn't have one.
+         */
+        @Nullable
+        public Integer getNextQualifier(String columnFamily) {
+            Integer counter = familyCounters.get(columnFamily);
+            if (counter == null) {
+                counter = ENCODED_CQ_COUNTER_INITIAL_VALUE;
+                familyCounters.put(columnFamily, counter);
+            }
+            return counter;
+        }
+        
+        public void setValue(String columnFamily, Integer value) {
+            familyCounters.put(columnFamily, value);
+        }
+        
+        /**
+         * 
+         * @param columnFamily
+         * @return true if the counter was incremented, false otherwise.
+         */
+        public boolean increment(String columnFamily) {
+            if (columnFamily == null) {
+                return false;
+            }
+            Integer counter = familyCounters.get(columnFamily);
+            if (counter == null) {
+                counter = ENCODED_CQ_COUNTER_INITIAL_VALUE;
+            }
+            counter++;
+            familyCounters.put(columnFamily, counter);
+            return true;
+        }
+        
+        public Map<String, Integer> values()  {
+            return Collections.unmodifiableMap(familyCounters);
+        }
+        
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index b4e0a06..d91ebcb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -25,11 +25,15 @@ import java.io.IOException;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+
+import javax.annotation.Nonnull;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
@@ -41,11 +45,14 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.ExpressionCompiler;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.exception.DataExceedsCapacityException;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.expression.SingleCellConstructorExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.IndexMaintainer;
@@ -63,6 +70,7 @@ import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.schema.types.PFloat;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.SizedUtil;
@@ -79,6 +87,7 @@ import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+
 /**
  *
  * Base class for PTable implementors.  Provides abstraction for
@@ -106,7 +115,8 @@ public class PTableImpl implements PTable {
     private List<PColumnFamily> families;
     private Map<byte[], PColumnFamily> familyByBytes;
     private Map<String, PColumnFamily> familyByString;
-    private ListMultimap<String,PColumn> columnsByName;
+    private ListMultimap<String, PColumn> columnsByName;
+    private Map<KVColumnFamilyQualifier, PColumn> kvColumnsByQualifiers;
     private PName pkName;
     private Integer bucketNum;
     private RowKeySchema rowKeySchema;
@@ -138,14 +148,18 @@ public class PTableImpl implements PTable {
     private boolean isNamespaceMapped;
     private String autoPartitionSeqName;
     private boolean isAppendOnlySchema;
+    private ImmutableStorageScheme immutableStorageScheme;
+    private QualifierEncodingScheme qualifierEncodingScheme;
+    private EncodedCQCounter encodedCQCounter;
 
     public PTableImpl() {
         this.indexes = Collections.emptyList();
         this.physicalNames = Collections.emptyList();
         this.rowKeySchema = RowKeySchema.EMPTY_SCHEMA;
     }
-
-    public PTableImpl(PName tenantId, String schemaName, String tableName, long timestamp, List<PColumnFamily> families, boolean isNamespaceMapped) { // For base table of mapped VIEW
+    
+    // Constructor used at table creation time
+    public PTableImpl(PName tenantId, String schemaName, String tableName, long timestamp, List<PColumnFamily> families, boolean isNamespaceMapped) {
         Preconditions.checkArgument(tenantId==null || tenantId.getBytes().length > 0); // tenantId should be null or not empty
         this.tenantId = tenantId;
         this.name = PNameFactory.newName(SchemaUtil.getTableName(schemaName, tableName));
@@ -169,8 +183,36 @@ public class PTableImpl implements PTable {
         this.isNamespaceMapped = isNamespaceMapped;
     }
     
+    public PTableImpl(PName tenantId, String schemaName, String tableName, long timestamp, List<PColumnFamily> families, boolean isNamespaceMapped, ImmutableStorageScheme storageScheme, QualifierEncodingScheme encodingScheme) { // For base table of mapped VIEW
+        Preconditions.checkArgument(tenantId==null || tenantId.getBytes().length > 0); // tenantId should be null or not empty
+        this.tenantId = tenantId;
+        this.name = PNameFactory.newName(SchemaUtil.getTableName(schemaName, tableName));
+        this.key = new PTableKey(tenantId, name.getString());
+        this.schemaName = PNameFactory.newName(schemaName);
+        this.tableName = PNameFactory.newName(tableName);
+        this.type = PTableType.VIEW;
+        this.viewType = ViewType.MAPPED;
+        this.timeStamp = timestamp;
+        this.pkColumns = this.allColumns = Collections.emptyList();
+        this.rowKeySchema = RowKeySchema.EMPTY_SCHEMA;
+        this.indexes = Collections.emptyList();
+        this.familyByBytes = Maps.newHashMapWithExpectedSize(families.size());
+        this.familyByString = Maps.newHashMapWithExpectedSize(families.size());
+        for (PColumnFamily family : families) {
+            familyByBytes.put(family.getName().getBytes(), family);
+            familyByString.put(family.getName().getString(), family);
+        }
+        this.families = families;
+        this.physicalNames = Collections.emptyList();
+        this.isNamespaceMapped = isNamespaceMapped;
+        this.immutableStorageScheme = storageScheme;
+        this.qualifierEncodingScheme = encodingScheme;
+    }
+    
+    // For indexes stored in shared physical tables
     public PTableImpl(PName tenantId, PName schemaName, PName tableName, long timestamp, List<PColumnFamily> families, 
-            List<PColumn> columns, List<PName> physicalNames, Short viewIndexId, boolean multiTenant, boolean isNamespaceMpped) throws SQLException { // For indexes stored in shared physical tables
+            List<PColumn> columns, List<PName> physicalNames, Short viewIndexId, boolean multiTenant, boolean isNamespaceMpped, ImmutableStorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, 
+            EncodedCQCounter encodedCQCounter) throws SQLException {
         this.pkColumns = this.allColumns = Collections.emptyList();
         this.rowKeySchema = RowKeySchema.EMPTY_SCHEMA;
         this.indexes = Collections.emptyList();
@@ -184,7 +226,7 @@ public class PTableImpl implements PTable {
         init(tenantId, this.schemaName, this.tableName, PTableType.INDEX, state, timeStamp, sequenceNumber, pkName, bucketNum, columns,
             this.schemaName, parentTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
             null, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable,
-            isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMpped, null, false);
+            isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMpped, null, false, storageScheme, qualifierEncodingScheme, encodedCQCounter);
     }
 
     public PTableImpl(long timeStamp) { // For delete marker
@@ -228,7 +270,7 @@ public class PTableImpl implements PTable {
                     indexes, table.isImmutableRows(), physicalNames, table.getDefaultFamilyName(), viewStatement,
                     table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
                     table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), updateCacheFrequency,
-                    table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
+                    table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
         }
 
     public static PTableImpl makePTable(PTable table, long timeStamp, List<PTable> indexes, PName parentSchemaName, String viewStatement) throws SQLException {
@@ -238,7 +280,7 @@ public class PTableImpl implements PTable {
                 indexes, table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), viewStatement,
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(),
-                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
+                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PTable table, Collection<PColumn> columns) throws SQLException {
@@ -248,7 +290,7 @@ public class PTableImpl implements PTable {
                 table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(),
-                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
+                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
     }
     
     public static PTableImpl makePTable(PTable table, Collection<PColumn> columns, PName defaultFamily) throws SQLException {
@@ -258,7 +300,7 @@ public class PTableImpl implements PTable {
                 table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), defaultFamily, table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(),
-                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
+                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, Collection<PColumn> columns) throws SQLException {
@@ -268,7 +310,7 @@ public class PTableImpl implements PTable {
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(),
                 table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), 
-                table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
+                table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, Collection<PColumn> columns, boolean isImmutableRows) throws SQLException {
@@ -278,7 +320,7 @@ public class PTableImpl implements PTable {
                 table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(),
                 table.getIndexType(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(),
-                table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
+                table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
     }
     
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, Collection<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled,
@@ -289,7 +331,7 @@ public class PTableImpl implements PTable {
                 table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 isWalDisabled, isMultitenant, storeNulls, table.getViewType(), table.getViewIndexId(), table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), isTransactional, updateCacheFrequency, table.getIndexDisableTimestamp(), 
-                isNamespaceMapped, table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
+                isNamespaceMapped, table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
     }
     
     public static PTableImpl makePTable(PTable table, PIndexState state) throws SQLException {
@@ -300,7 +342,7 @@ public class PTableImpl implements PTable {
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(),
-                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
+                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PTable table, boolean rowKeyOrderOptimizable) throws SQLException {
@@ -311,7 +353,7 @@ public class PTableImpl implements PTable {
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
                 table.getBaseColumnCount(), rowKeyOrderOptimizable, table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), 
-                table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
+                table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PTable table) throws SQLException {
@@ -322,7 +364,7 @@ public class PTableImpl implements PTable {
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), 
-                table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
+                table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type,
@@ -331,12 +373,12 @@ public class PTableImpl implements PTable {
             boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression,
             boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
             IndexType indexType, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency,
-            long indexDisableTimestamp, boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema) throws SQLException {
+            long indexDisableTimestamp, boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, ImmutableStorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter encodedCQCounter) throws SQLException {
         return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataSchemaName,
                 dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
                 viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId,
                 indexType, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, rowKeyOrderOptimizable, isTransactional,
-                updateCacheFrequency,indexDisableTimestamp, isNamespaceMapped, autoPartitionSeqName, isAppendOnlySchema);
+                updateCacheFrequency,indexDisableTimestamp, isNamespaceMapped, autoPartitionSeqName, isAppendOnlySchema, storageScheme, qualifierEncodingScheme, encodedCQCounter);
     }
 
     public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type,
@@ -346,13 +388,13 @@ public class PTableImpl implements PTable {
             boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
             IndexType indexType, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency,
             int baseColumnCount, long indexDisableTimestamp, boolean isNamespaceMapped,
-            String autoPartitionSeqName, boolean isAppendOnlySchema)
+            String autoPartitionSeqName, boolean isAppendOnlySchema, ImmutableStorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter encodedCQCounter)
             throws SQLException {
         return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName,
                 bucketNum, columns, dataSchemaName, dataTableName, indexes, isImmutableRows, physicalNames,
                 defaultFamilyName, viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId,
                 indexType, baseColumnCount, rowKeyOrderOptimizable, isTransactional, updateCacheFrequency, 
-                indexDisableTimestamp, isNamespaceMapped, autoPartitionSeqName, isAppendOnlySchema);
+                indexDisableTimestamp, isNamespaceMapped, autoPartitionSeqName, isAppendOnlySchema, storageScheme, qualifierEncodingScheme, encodedCQCounter);
     }
 
     private PTableImpl(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state,
@@ -361,11 +403,13 @@ public class PTableImpl implements PTable {
             List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant,
             boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType,
             int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency,
-            long indexDisableTimestamp, boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema) throws SQLException {
+            long indexDisableTimestamp, boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, ImmutableStorageScheme storageScheme, 
+            QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter encodedCQCounter) throws SQLException {
         init(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns,
                 parentSchemaName, parentTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
                 viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable,
-                isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, autoPartitionSeqName, isAppendOnlySchema);
+                isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, autoPartitionSeqName, isAppendOnlySchema, storageScheme, 
+                qualifierEncodingScheme, encodedCQCounter);
     }
     
     @Override
@@ -399,7 +443,8 @@ public class PTableImpl implements PTable {
             List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL,
             boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
             IndexType indexType , int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, long indexDisableTimestamp, 
-            boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema) throws SQLException {
+            boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, ImmutableStorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, 
+            EncodedCQCounter encodedCQCounter) throws SQLException {
         Preconditions.checkNotNull(schemaName);
         Preconditions.checkArgument(tenantId==null || tenantId.getBytes().length > 0); // tenantId should be null or not empty
         int estimatedSize = SizedUtil.OBJECT_SIZE * 2 + 23 * SizedUtil.POINTER_SIZE + 4 * SizedUtil.INT_SIZE + 2 * SizedUtil.LONG_SIZE + 2 * SizedUtil.INT_OBJECT_SIZE +
@@ -435,10 +480,13 @@ public class PTableImpl implements PTable {
         this.isNamespaceMapped = isNamespaceMapped;
         this.autoPartitionSeqName = autoPartitionSeqName;
         this.isAppendOnlySchema = isAppendOnlySchema;
+        this.immutableStorageScheme = storageScheme;
+        this.qualifierEncodingScheme = qualifierEncodingScheme;
         List<PColumn> pkColumns;
         PColumn[] allColumns;
         
         this.columnsByName = ArrayListMultimap.create(columns.size(), 1);
+        this.kvColumnsByQualifiers = Maps.newHashMapWithExpectedSize(columns.size());
         int numPKColumns = 0;
         if (bucketNum != null) {
             // Add salt column to allColumns and pkColumns, but don't add to
@@ -464,11 +512,21 @@ public class PTableImpl implements PTable {
                     if (Objects.equal(familyName, dupColumn.getFamilyName())) {
                         count++;
                         if (count > 1) {
-                            throw new ColumnAlreadyExistsException(null, name.getString(), columnName);
+                            throw new ColumnAlreadyExistsException(schemaName.getString(), name.getString(), columnName);
                         }
                     }
                 }
             }
+            byte[] cq = column.getColumnQualifierBytes();
+            String cf = column.getFamilyName() != null ? column.getFamilyName().getString() : null;
+            if (cf != null && cq != null) {
+                KVColumnFamilyQualifier info = new KVColumnFamilyQualifier(cf, cq);
+                if (kvColumnsByQualifiers.get(info) != null) {
+                    throw new ColumnAlreadyExistsException(schemaName.getString(),
+                            name.getString(), columnName);
+                }
+                kvColumnsByQualifiers.put(info, column);
+            }
         }
         estimatedSize += SizedUtil.sizeOfMap(allColumns.length, SizedUtil.POINTER_SIZE, SizedUtil.sizeOfArrayList(1)); // for multi-map
 
@@ -528,7 +586,7 @@ public class PTableImpl implements PTable {
                 .orderedBy(Bytes.BYTES_COMPARATOR);
         for (int i = 0; i < families.length; i++) {
             Map.Entry<PName,List<PColumn>> entry = iterator.next();
-            PColumnFamily family = new PColumnFamilyImpl(entry.getKey(), entry.getValue());
+            PColumnFamily family = new PColumnFamilyImpl(entry.getKey(), entry.getValue());//, qualifierEncodingScheme);
             families[i] = family;
             familyByString.put(family.getName().getString(), family);
             familyByBytes.put(family.getName().getBytes(), family);
@@ -554,9 +612,9 @@ public class PTableImpl implements PTable {
         for (PName name : this.physicalNames) {
             estimatedSize += name.getEstimatedSize();
         }
-
         this.estimatedSize = estimatedSize;
         this.baseColumnCount = baseColumnCount;
+        this.encodedCQCounter = encodedCQCounter;
     }
 
     @Override
@@ -746,7 +804,7 @@ public class PTableImpl implements PTable {
     }
 
     @Override
-    public PColumn getColumn(String name) throws ColumnNotFoundException, AmbiguousColumnException {
+    public PColumn getColumnForColumnName(String name) throws ColumnNotFoundException, AmbiguousColumnException {
         List<PColumn> columns = columnsByName.get(name);
         int size = columns.size();
         if (size == 0) {
@@ -765,6 +823,22 @@ public class PTableImpl implements PTable {
         }
         return columns.get(0);
     }
+    
+    @Override
+    public PColumn getColumnForColumnQualifier(byte[] cf, byte[] cq) throws ColumnNotFoundException, AmbiguousColumnException {
+        Preconditions.checkNotNull(cq);
+        if (!EncodedColumnsUtil.usesEncodedColumnNames(this) || cf == null) {
+            String columnName = (String)PVarchar.INSTANCE.toObject(cq);
+            return getColumnForColumnName(columnName);
+        } else {
+            String family = (String)PVarchar.INSTANCE.toObject(cf);
+            PColumn col = kvColumnsByQualifiers.get(new KVColumnFamilyQualifier(family, cq));
+            if (col == null) {
+                throw new ColumnNotFoundException("No column found for column qualifier " + qualifierEncodingScheme.decode(cq));
+            }
+            return col;
+        }
+    }
 
     /**
      *
@@ -785,6 +859,8 @@ public class PTableImpl implements PTable {
         private Mutation deleteRow;
         private final long ts;
         private final boolean hasOnDupKey;
+        // map from column name to value 
+        private Map<PColumn, byte[]> columnToValueMap; 
 
         public PRowImpl(KeyValueBuilder kvBuilder, ImmutableBytesWritable key, long ts, Integer bucketNum, boolean hasOnDupKey) {
             this.kvBuilder = kvBuilder;
@@ -797,7 +873,7 @@ public class PTableImpl implements PTable {
                 this.keyPtr =  new ImmutableBytesPtr(key);
                 this.key = ByteUtil.copyKeyBytesIfNecessary(key);
             }
-
+            this.columnToValueMap = Maps.newHashMapWithExpectedSize(1);
             newMutations();
         }
 
@@ -819,13 +895,48 @@ public class PTableImpl implements PTable {
                 // Include only deleteRow mutation if present because it takes precedence over all others
                 mutations.add(deleteRow);
             } else {
+                // store all columns for a given column family in a single cell instead of one column per cell in order to improve write performance
+                if (immutableStorageScheme != ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
+                    Put put = new Put(this.key);
+                    if (isWALDisabled()) {
+                        put.setDurability(Durability.SKIP_WAL);
+                    }
+                    // the setValues Put contains one cell per column, we need to convert it to a Put that contains a cell with all columns for a given column family
+                    for (PColumnFamily family : families) {
+                        byte[] columnFamily = family.getName().getBytes();
+                        Collection<PColumn> columns = family.getColumns();
+                        int maxEncodedColumnQualifier = Integer.MIN_VALUE;
+                        for (PColumn column : columns) {
+                            int qualifier = qualifierEncodingScheme.decode(column.getColumnQualifierBytes());
+                            maxEncodedColumnQualifier = Math.max(maxEncodedColumnQualifier, qualifier);
+                        }
+                        Expression[] colValues = EncodedColumnsUtil.createColumnExpressionArray(maxEncodedColumnQualifier);
+                        for (PColumn column : columns) {
+                        	if (columnToValueMap.containsKey(column)) {
+                        	    int colIndex = qualifierEncodingScheme.decode(column.getColumnQualifierBytes())-QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE+1;
+                        	    colValues[colIndex] = new LiteralExpression(columnToValueMap.get(column));
+                        	}
+                        }
+                        
+                        List<Expression> children = Arrays.asList(colValues);
+                        // we use SingleCellConstructorExpression to serialize all the columns into a single byte[]
+                        SingleCellConstructorExpression singleCellConstructorExpression = new SingleCellConstructorExpression(immutableStorageScheme, children);
+                        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                        singleCellConstructorExpression.evaluate(null, ptr);
+                        ImmutableBytesPtr colFamilyPtr = new ImmutableBytesPtr(columnFamily);
+                        addQuietly(put, kvBuilder, kvBuilder.buildPut(keyPtr,
+                            colFamilyPtr, QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES_PTR, ts, ptr));
+                    }
+                    setValues = put;
+                }
                 // Because we cannot enforce a not null constraint on a KV column (since we don't know if the row exists when
-                // we upsert it), se instead add a KV that is always emtpy. This allows us to imitate SQL semantics given the
+                // we upsert it), so instead add a KV that is always empty. This allows us to imitate SQL semantics given the
                 // way HBase works.
+                Pair<byte[], byte[]> emptyKvInfo = EncodedColumnsUtil.getEmptyKeyValueInfo(PTableImpl.this);
                 addQuietly(setValues, kvBuilder, kvBuilder.buildPut(keyPtr,
                     SchemaUtil.getEmptyColumnFamilyPtr(PTableImpl.this),
-                    QueryConstants.EMPTY_COLUMN_BYTES_PTR, ts,
-                    QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR));
+                    new ImmutableBytesPtr(emptyKvInfo.getFirst()), ts,
+                    new ImmutableBytesPtr(emptyKvInfo.getSecond())));
                 mutations.add(setValues);
                 if (!unsetValues.isEmpty()) {
                     mutations.add(unsetValues);
@@ -854,7 +965,8 @@ public class PTableImpl implements PTable {
         public void setValue(PColumn column, byte[] byteValue) {
             deleteRow = null;
             byte[] family = column.getFamilyName().getBytes();
-            byte[] qualifier = column.getName().getBytes();
+            byte[] qualifier = column.getColumnQualifierBytes();
+            ImmutableBytesPtr qualifierPtr = new ImmutableBytesPtr(qualifier);
             PDataType<?> type = column.getDataType();
             // Check null, since some types have no byte representation for null
             if (byteValue == null) {
@@ -874,7 +986,7 @@ public class PTableImpl implements PTable {
                 // case of updates occurring due to the execution of the clause.
                 removeIfPresent(setValues, family, qualifier);
                 deleteQuietly(unsetValues, kvBuilder, kvBuilder.buildDeleteColumns(keyPtr, column
-                            .getFamilyName().getBytesPtr(), column.getName().getBytesPtr(), ts));
+                            .getFamilyName().getBytesPtr(), qualifierPtr, ts));
             } else {
                 ImmutableBytesWritable ptr = new ImmutableBytesWritable(byteValue);
                 Integer	maxLength = column.getMaxLength();
@@ -887,9 +999,17 @@ public class PTableImpl implements PTable {
                 ptr.set(byteValue);
                 type.pad(ptr, maxLength, sortOrder);
                 removeIfPresent(unsetValues, family, qualifier);
-                addQuietly(setValues, kvBuilder, kvBuilder.buildPut(keyPtr,
-                        column.getFamilyName().getBytesPtr(), column.getName().getBytesPtr(),
+                // store all columns for a given column family in a single cell instead of one column per cell in order to improve write performance
+                // we don't need to do anything with unsetValues as it is only used when storeNulls is false, storeNulls is always true when storeColsInSingleCell is true
+                if (immutableStorageScheme == ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+                    columnToValueMap.put(column, ptr.get());
+                }
+                else {
+                    removeIfPresent(unsetValues, family, qualifier);
+                    addQuietly(setValues, kvBuilder, kvBuilder.buildPut(keyPtr,
+                        column.getFamilyName().getBytesPtr(), qualifierPtr,
                         ts, ptr));
+                }
             }
         }
 
@@ -922,6 +1042,7 @@ public class PTableImpl implements PTable {
                 deleteRow.setDurability(Durability.SKIP_WAL);
             }
         }
+        
     }
 
     @Override
@@ -1082,116 +1203,136 @@ public class PTableImpl implements PTable {
     public IndexType getIndexType() {
         return indexType;
     }
-
+    
     /**
      * Construct a PTable instance from ProtoBuffered PTable instance
      * @param table
      */
     public static PTable createFromProto(PTableProtos.PTable table) {
-      PName tenantId = null;
-      if(table.hasTenantId()){
-        tenantId = PNameFactory.newName(table.getTenantId().toByteArray());
-      }
-      PName schemaName = PNameFactory.newName(table.getSchemaNameBytes().toByteArray());
-      PName tableName = PNameFactory.newName(table.getTableNameBytes().toByteArray());
-      PTableType tableType = PTableType.values()[table.getTableType().ordinal()];
-      PIndexState indexState = null;
-      if (table.hasIndexState()) {
-        indexState = PIndexState.fromSerializedValue(table.getIndexState());
-      }
-      Short viewIndexId = null;
-      if(table.hasViewIndexId()){
-    	  viewIndexId = (short)table.getViewIndexId();
-      }
-      IndexType indexType = IndexType.getDefault();
-      if(table.hasIndexType()){
-          indexType = IndexType.fromSerializedValue(table.getIndexType().toByteArray()[0]);
-      }
-      long sequenceNumber = table.getSequenceNumber();
-      long timeStamp = table.getTimeStamp();
-      long indexDisableTimestamp = table.getIndexDisableTimestamp();
-      PName pkName = null;
-      if (table.hasPkNameBytes()) {
-        pkName = PNameFactory.newName(table.getPkNameBytes().toByteArray());
-      }
-      int bucketNum = table.getBucketNum();
-      List<PColumn> columns = Lists.newArrayListWithExpectedSize(table.getColumnsCount());
-      for (PTableProtos.PColumn curPColumnProto : table.getColumnsList()) {
-        columns.add(PColumnImpl.createFromProto(curPColumnProto));
-      }
-      List<PTable> indexes = Lists.newArrayListWithExpectedSize(table.getIndexesCount());
-      for (PTableProtos.PTable curPTableProto : table.getIndexesList()) {
-        indexes.add(createFromProto(curPTableProto));
-      }
+        PName tenantId = null;
+        if(table.hasTenantId()){
+            tenantId = PNameFactory.newName(table.getTenantId().toByteArray());
+        }
+        PName schemaName = PNameFactory.newName(table.getSchemaNameBytes().toByteArray());
+        PName tableName = PNameFactory.newName(table.getTableNameBytes().toByteArray());
+        PTableType tableType = PTableType.values()[table.getTableType().ordinal()];
+        PIndexState indexState = null;
+        if (table.hasIndexState()) {
+            indexState = PIndexState.fromSerializedValue(table.getIndexState());
+        }
+        Short viewIndexId = null;
+        if(table.hasViewIndexId()){
+            viewIndexId = (short)table.getViewIndexId();
+        }
+        IndexType indexType = IndexType.getDefault();
+        if(table.hasIndexType()){
+            indexType = IndexType.fromSerializedValue(table.getIndexType().toByteArray()[0]);
+        }
+        long sequenceNumber = table.getSequenceNumber();
+        long timeStamp = table.getTimeStamp();
+        long indexDisableTimestamp = table.getIndexDisableTimestamp();
+        PName pkName = null;
+        if (table.hasPkNameBytes()) {
+            pkName = PNameFactory.newName(table.getPkNameBytes().toByteArray());
+        }
+        int bucketNum = table.getBucketNum();
+        List<PColumn> columns = Lists.newArrayListWithExpectedSize(table.getColumnsCount());
+        for (PTableProtos.PColumn curPColumnProto : table.getColumnsList()) {
+            columns.add(PColumnImpl.createFromProto(curPColumnProto));
+        }
+        List<PTable> indexes = Lists.newArrayListWithExpectedSize(table.getIndexesCount());
+        for (PTableProtos.PTable curPTableProto : table.getIndexesList()) {
+            indexes.add(createFromProto(curPTableProto));
+        }
 
-      boolean isImmutableRows = table.getIsImmutableRows();
-      PName parentSchemaName = null;
-      PName parentTableName = null;
-      if (table.hasParentNameBytes()) {
-        parentSchemaName = PNameFactory.newName(SchemaUtil.getSchemaNameFromFullName((table.getParentNameBytes().toByteArray())));
-        parentTableName = PNameFactory.newName(SchemaUtil.getTableNameFromFullName(table.getParentNameBytes().toByteArray()));
-      }
-      PName defaultFamilyName = null;
-      if (table.hasDefaultFamilyName()) {
-        defaultFamilyName = PNameFactory.newName(table.getDefaultFamilyName().toByteArray());
-      }
-      boolean disableWAL = table.getDisableWAL();
-      boolean multiTenant = table.getMultiTenant();
-      boolean storeNulls = table.getStoreNulls();
-      boolean isTransactional = table.getTransactional();
-      ViewType viewType = null;
-      String viewStatement = null;
-      List<PName> physicalNames = Collections.emptyList();
-      if (tableType == PTableType.VIEW) {
-        viewType = ViewType.fromSerializedValue(table.getViewType().toByteArray()[0]);
-      }
-      if(table.hasViewStatement()){
-        viewStatement = (String) PVarchar.INSTANCE.toObject(table.getViewStatement().toByteArray());
-      }
-      if (tableType == PTableType.VIEW || viewIndexId != null) {
-        physicalNames = Lists.newArrayListWithExpectedSize(table.getPhysicalNamesCount());
-        for(int i = 0; i < table.getPhysicalNamesCount(); i++){
-          physicalNames.add(PNameFactory.newName(table.getPhysicalNames(i).toByteArray()));
+        boolean isImmutableRows = table.getIsImmutableRows();
+        PName parentSchemaName = null;
+        PName parentTableName = null;
+        if (table.hasParentNameBytes()) {
+            parentSchemaName = PNameFactory.newName(SchemaUtil.getSchemaNameFromFullName((table.getParentNameBytes().toByteArray())));
+            parentTableName = PNameFactory.newName(SchemaUtil.getTableNameFromFullName(table.getParentNameBytes().toByteArray()));
+        }
+        PName defaultFamilyName = null;
+        if (table.hasDefaultFamilyName()) {
+            defaultFamilyName = PNameFactory.newName(table.getDefaultFamilyName().toByteArray());
+        }
+        boolean disableWAL = table.getDisableWAL();
+        boolean multiTenant = table.getMultiTenant();
+        boolean storeNulls = table.getStoreNulls();
+        boolean isTransactional = table.getTransactional();
+        ViewType viewType = null;
+        String viewStatement = null;
+        List<PName> physicalNames = Collections.emptyList();
+        if (tableType == PTableType.VIEW) {
+            viewType = ViewType.fromSerializedValue(table.getViewType().toByteArray()[0]);
+        }
+        if(table.hasViewStatement()){
+            viewStatement = (String) PVarchar.INSTANCE.toObject(table.getViewStatement().toByteArray());
+        }
+        if (tableType == PTableType.VIEW || viewIndexId != null) {
+            physicalNames = Lists.newArrayListWithExpectedSize(table.getPhysicalNamesCount());
+            for(int i = 0; i < table.getPhysicalNamesCount(); i++) {
+                physicalNames.add(PNameFactory.newName(table.getPhysicalNames(i).toByteArray()));
+            }
+        }
+        int baseColumnCount = -1;
+        if (table.hasBaseColumnCount()) {
+            baseColumnCount = table.getBaseColumnCount();
         }
-      }
-      
-      int baseColumnCount = -1;
-      if (table.hasBaseColumnCount()) {
-          baseColumnCount = table.getBaseColumnCount();
-      }
 
-      boolean rowKeyOrderOptimizable = false;
-      if (table.hasRowKeyOrderOptimizable()) {
-          rowKeyOrderOptimizable = table.getRowKeyOrderOptimizable();
-      }
-      long updateCacheFrequency = 0;
-      if (table.hasUpdateCacheFrequency()) {
-          updateCacheFrequency = table.getUpdateCacheFrequency();
-      }
-      boolean isNamespaceMapped=false;
-      if (table.hasIsNamespaceMapped()) {
-          isNamespaceMapped = table.getIsNamespaceMapped();
-      }
-      String autoParititonSeqName = null;
-      if (table.hasAutoParititonSeqName()) {
-          autoParititonSeqName = table.getAutoParititonSeqName();
-      }
-      boolean isAppendOnlySchema = false;
-      if (table.hasIsAppendOnlySchema()) {
-          isAppendOnlySchema = table.getIsAppendOnlySchema();
-      }
-      
-      try {
-        PTableImpl result = new PTableImpl();
-        result.init(tenantId, schemaName, tableName, tableType, indexState, timeStamp, sequenceNumber, pkName,
-            (bucketNum == NO_SALTING) ? null : bucketNum, columns, parentSchemaName, parentTableName, indexes,
-            isImmutableRows, physicalNames, defaultFamilyName, viewStatement, disableWAL,
-            multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable,
-            isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, autoParititonSeqName, isAppendOnlySchema);
-        return result;
-      } catch (SQLException e) {
-        throw new RuntimeException(e); // Impossible
-      }
+        boolean rowKeyOrderOptimizable = false;
+        if (table.hasRowKeyOrderOptimizable()) {
+            rowKeyOrderOptimizable = table.getRowKeyOrderOptimizable();
+        }
+        long updateCacheFrequency = 0;
+        if (table.hasUpdateCacheFrequency()) {
+            updateCacheFrequency = table.getUpdateCacheFrequency();
+        }
+        boolean isNamespaceMapped=false;
+        if (table.hasIsNamespaceMapped()) {
+            isNamespaceMapped = table.getIsNamespaceMapped();
+        }
+        String autoParititonSeqName = null;
+        if (table.hasAutoParititonSeqName()) {
+            autoParititonSeqName = table.getAutoParititonSeqName();
+        }
+        boolean isAppendOnlySchema = false;
+        if (table.hasIsAppendOnlySchema()) {
+            isAppendOnlySchema = table.getIsAppendOnlySchema();
+        }
+        ImmutableStorageScheme storageScheme = null;
+        if (table.hasStorageScheme()) {
+            storageScheme = ImmutableStorageScheme.fromSerializedValue(table.getStorageScheme().toByteArray()[0]);
+        }
+        QualifierEncodingScheme qualifierEncodingScheme = null;
+        if (table.hasEncodingScheme()) {
+            qualifierEncodingScheme = QualifierEncodingScheme.fromSerializedValue(table.getEncodingScheme().toByteArray()[0]);
+        }
+        EncodedCQCounter encodedColumnQualifierCounter = null;
+        if ((!EncodedColumnsUtil.usesEncodedColumnNames(qualifierEncodingScheme) || tableType == PTableType.VIEW)) {
+        	encodedColumnQualifierCounter = PTable.EncodedCQCounter.NULL_COUNTER;
+        }
+        else {
+        	encodedColumnQualifierCounter = new EncodedCQCounter();
+        	if (table.getEncodedCQCountersList() != null) {
+        		for (org.apache.phoenix.coprocessor.generated.PTableProtos.EncodedCQCounter cqCounterFromProto : table.getEncodedCQCountersList()) {
+        			encodedColumnQualifierCounter.setValue(cqCounterFromProto.getColFamily(), cqCounterFromProto.getCounter());
+        		}
+        	}
+        }
+
+        try {
+            PTableImpl result = new PTableImpl();
+            result.init(tenantId, schemaName, tableName, tableType, indexState, timeStamp, sequenceNumber, pkName,
+                (bucketNum == NO_SALTING) ? null : bucketNum, columns, parentSchemaName, parentTableName, indexes,
+                        isImmutableRows, physicalNames, defaultFamilyName, viewStatement, disableWAL,
+                        multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable,
+                        isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, autoParititonSeqName, 
+                        isAppendOnlySchema, storageScheme, qualifierEncodingScheme, encodedColumnQualifierCounter);
+            return result;
+        } catch (SQLException e) {
+            throw new RuntimeException(e); // Impossible
+        }
     }
 
     public static PTableProtos.PTable toProto(PTable table) {
@@ -1269,10 +1410,25 @@ public class PTableImpl implements PTable {
       builder.setUpdateCacheFrequency(table.getUpdateCacheFrequency());
       builder.setIndexDisableTimestamp(table.getIndexDisableTimestamp());
       builder.setIsNamespaceMapped(table.isNamespaceMapped());
-      if (table.getAutoPartitionSeqName()!= null) {
+      if (table.getAutoPartitionSeqName() != null) {
           builder.setAutoParititonSeqName(table.getAutoPartitionSeqName());
       }
       builder.setIsAppendOnlySchema(table.isAppendOnlySchema());
+      if (table.getImmutableStorageScheme() != null) {
+          builder.setStorageScheme(ByteStringer.wrap(new byte[]{table.getImmutableStorageScheme().getSerializedMetadataValue()}));
+      }
+      if (table.getEncodedCQCounter() != null) {
+          Map<String, Integer> values = table.getEncodedCQCounter().values();
+          for (Entry<String, Integer> cqCounter : values.entrySet()) {
+              org.apache.phoenix.coprocessor.generated.PTableProtos.EncodedCQCounter.Builder cqBuilder = org.apache.phoenix.coprocessor.generated.PTableProtos.EncodedCQCounter.newBuilder();
+              cqBuilder.setColFamily(cqCounter.getKey());
+              cqBuilder.setCounter(cqCounter.getValue());
+              builder.addEncodedCQCounters(cqBuilder.build());
+          }
+      }
+      if (table.getEncodingScheme() != null) {
+          builder.setEncodingScheme(ByteStringer.wrap(new byte[]{table.getEncodingScheme().getSerializedMetadataValue()}));
+      }
       return builder.build();
     }
 
@@ -1342,4 +1498,54 @@ public class PTableImpl implements PTable {
         } else if (!key.equals(other.getKey())) return false;
         return true;
     }
+    
+    @Override
+    public ImmutableStorageScheme getImmutableStorageScheme() {
+        return immutableStorageScheme;
+    }
+    
+    @Override
+    public EncodedCQCounter getEncodedCQCounter() {
+        return encodedCQCounter;
+    }
+
+    @Override
+    public QualifierEncodingScheme getEncodingScheme() {
+        return qualifierEncodingScheme;
+    }
+    
+    private static final class KVColumnFamilyQualifier {
+        @Nonnull
+        private final String colFamilyName;
+        @Nonnull
+        private final byte[] colQualifier;
+
+        public KVColumnFamilyQualifier(String colFamilyName, byte[] colQualifier) {
+            Preconditions.checkArgument(colFamilyName != null && colQualifier != null,
+                "None of the arguments, column family name or column qualifier can be null");
+            this.colFamilyName = colFamilyName;
+            this.colQualifier = colQualifier;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + colFamilyName.hashCode();
+            result = prime * result + Arrays.hashCode(colQualifier);
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) return true;
+            if (obj == null) return false;
+            if (getClass() != obj.getClass()) return false;
+            KVColumnFamilyQualifier other = (KVColumnFamilyQualifier) obj;
+            if (!colFamilyName.equals(other.colFamilyName)) return false;
+            if (!Arrays.equals(colQualifier, other.colQualifier)) return false;
+            return true;
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableKey.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableKey.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableKey.java
index 42699d9..017c75d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableKey.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableKey.java
@@ -28,7 +28,11 @@ public class PTableKey {
     public PTableKey(PName tenantId, String name) {
         Preconditions.checkNotNull(name);
         this.tenantId = tenantId;
-        this.name = name;
+        if (name.indexOf(QueryConstants.NAMESPACE_SEPARATOR) != -1) {
+            this.name = name.replace(QueryConstants.NAMESPACE_SEPARATOR, QueryConstants.NAME_SEPARATOR);
+        } else {
+            this.name = name;
+        }
     }
 
     public PName getTenantId() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/schema/ProjectedColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ProjectedColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ProjectedColumn.java
index 19dd1c1..d875982 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ProjectedColumn.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ProjectedColumn.java
@@ -24,14 +24,16 @@ public class ProjectedColumn extends DelegateColumn {
     private final int position;
     private final boolean nullable;
     private final ColumnRef sourceColumnRef;
+    private final byte[] cq;
 
-    public ProjectedColumn(PName name, PName familyName, int position, boolean nullable, ColumnRef sourceColumnRef) {
+    public ProjectedColumn(PName name, PName familyName, int position, boolean nullable, ColumnRef sourceColumnRef, byte[] cq) {
         super(sourceColumnRef.getColumn());
         this.name = name;
         this.familyName = familyName;
         this.position = position;
         this.nullable = nullable;
         this.sourceColumnRef = sourceColumnRef;
+        this.cq = cq;
     }
     
     @Override
@@ -39,6 +41,7 @@ public class ProjectedColumn extends DelegateColumn {
         return name;
     }
     
+    @Override
     public PName getFamilyName() {
         return familyName;
     }
@@ -52,7 +55,12 @@ public class ProjectedColumn extends DelegateColumn {
     public boolean isNullable() {
         return nullable;
     }
-
+    
+    @Override
+    public byte[] getColumnQualifierBytes() {
+        return cq;
+    }
+    
     public ColumnRef getSourceColumnRef() {
         return sourceColumnRef;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
index 734a9ed..23cfd1b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
@@ -38,7 +38,7 @@ public class SaltingUtil {
     public static final String SALTING_COLUMN_NAME = "_SALT";
     public static final String SALTED_ROW_KEY_NAME = "_SALTED_KEY";
     public static final PColumnImpl SALTING_COLUMN = new PColumnImpl(
-            PNameFactory.newName(SALTING_COLUMN_NAME), null, PBinary.INSTANCE, 1, 0, false, 0, SortOrder.getDefault(), 0, null, false, null, false, false);
+            PNameFactory.newName(SALTING_COLUMN_NAME), null, PBinary.INSTANCE, 1, 0, false, 0, SortOrder.getDefault(), 0, null, false, null, false, false, null);
     public static final RowKeySchema VAR_BINARY_SALTED_SCHEMA = new RowKeySchemaBuilder(2)
         .addField(SALTING_COLUMN, false, SortOrder.getDefault())
         .addField(SchemaUtil.VAR_BINARY_DATUM, false, SortOrder.getDefault()).build();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
index 9962859..3282cc1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.util.SchemaUtil;
 
 public enum TableProperty {
@@ -145,6 +146,47 @@ public enum TableProperty {
         }       
 	    
 	},
+	
+	COLUMN_ENCODED_BYTES(PhoenixDatabaseMetaData.ENCODING_SCHEME, COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY, false, false, false) {
+	    @Override
+        public Object getValue(Object value) {
+	        if (value instanceof String) {
+	            String strValue = (String) value;
+	            if ("NONE".equalsIgnoreCase(strValue)) {
+	                return (byte)0;
+	            } 
+	        } else {
+	            return value == null ? null : ((Number) value).byteValue();
+	        }
+	        return value;
+	    }
+
+		@Override
+		public Object getPTableValue(PTable table) {
+			return table.getEncodingScheme();
+		}	
+	    
+	},
+    
+    IMMUTABLE_STORAGE_SCHEME(PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME, COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY, true, false, false) {
+        @Override
+        public ImmutableStorageScheme getValue(Object value) {
+            if (value == null) {
+                return null;
+            } else if (value instanceof String) {
+                String strValue = (String) value;
+                return ImmutableStorageScheme.valueOf(strValue.toUpperCase());
+            } else {
+                throw new IllegalArgumentException("Immutable storage scheme table property must be a string");
+            }
+        }
+
+        @Override
+        public Object getPTableValue(PTable table) {
+            return table.getImmutableStorageScheme();
+        }   
+        
+    }
     ;
 	
 	private final String propertyName;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BaseTuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BaseTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BaseTuple.java
index a8dc487..8028eb2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BaseTuple.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BaseTuple.java
@@ -17,11 +17,50 @@
  */
 package org.apache.phoenix.schema.tuple;
 
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
 
 public abstract class BaseTuple implements Tuple {
+    @Override
+    public int size() {
+        throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public boolean isImmutable() {
+        throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public void getKey(ImmutableBytesWritable ptr) {
+        throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public Cell getValue(int index) {
+        throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public Cell getValue(byte [] family, byte [] qualifier) {
+        throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public boolean getValue(byte [] family, byte [] qualifier, ImmutableBytesWritable ptr) {
+        throw new UnsupportedOperationException();
+    }
 
     @Override
     public long getSequenceValue(int index) {
         throw new UnsupportedOperationException();
     }
+    
+    @Override
+    public void setKeyValues(List<Cell> values) {
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/DelegateTuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/DelegateTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/DelegateTuple.java
index 58b1eda..3430f5b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/DelegateTuple.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/DelegateTuple.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.schema.tuple;
 
+import java.util.List;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 
@@ -61,4 +63,9 @@ public class DelegateTuple implements Tuple {
     public long getSequenceValue(int index) {
         return delegate.getSequenceValue(index);
     }
+
+    @Override
+    public void setKeyValues(List<Cell> values) {
+        delegate.setKeyValues(values);
+    }
 }