You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2017/03/09 23:02:25 UTC

[12/50] [abbrv] phoenix git commit: PHOENIX-1598 Encode column names to save space and improve performance

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index fdf5498..a41dbf0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -65,7 +65,6 @@ import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
@@ -244,8 +243,16 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
                 for (ColumnReference ref : mutableColumns) {
                     scan.addColumn(ref.getFamily(), ref.getQualifier());
                 }
+                /*
+                 * Indexes inherit the storage scheme of the data table which means all the indexes have the same
+                 * storage scheme and empty key value qualifier. Note that this assumption would be broken if we start
+                 * supporting new indexes over existing data tables to have a different storage scheme than the data
+                 * table.
+                 */
+                byte[] emptyKeyValueQualifier = indexMaintainers.get(0).getEmptyKeyValueQualifier();
+                
                 // Project empty key value column
-                scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES);
+                scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), emptyKeyValueQualifier);
                 ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1);
                 scanRanges.initializeScan(scan);
                 TableName tableName = env.getRegion().getRegionInfo().getTable();
@@ -296,7 +303,8 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
             Map<ImmutableBytesPtr, MultiMutation> mutationsToFindPreviousValue) throws IOException {
         if (scanner != null) {
             Result result;
-            ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES);
+            ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0)
+                    .getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier());
             // Process existing data table rows by removing the old index row and adding the new index row
             while ((result = scanner.next()) != null) {
                 Mutation m = mutationsToFindPreviousValue.remove(new ImmutableBytesPtr(result.getRow()));
@@ -324,7 +332,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
             // to generate point delete markers for all index rows that were added. We don't have Tephra
             // manage index rows in change sets because we don't want to be hit with the additional
             // memory hit and do not need to do conflict detection on index rows.
-            ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES);
+            ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier());
             while ((result = scanner.next()) != null) {
                 Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow()));
                 // Sort by timestamp, type, cf, cq so we can process in time batches from oldest to newest

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 51587f1..d563bc2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -17,12 +17,17 @@
  */
 package org.apache.phoenix.iterate;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER;
+import static org.apache.phoenix.schema.PTable.IndexType.LOCAL;
+import static org.apache.phoenix.schema.PTableType.INDEX;
 import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
+import static org.apache.phoenix.util.EncodedColumnsUtil.isPossibleToUseEncodedCQFilter;
+import static org.apache.phoenix.util.ScanUtil.hasDynamicColumns;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInput;
@@ -30,6 +35,7 @@ import java.io.DataInputStream;
 import java.io.EOFException;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -69,6 +75,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.filter.ColumnProjectionFilter;
 import org.apache.phoenix.filter.DistinctPrefixFilter;
+import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
 import org.apache.phoenix.parse.FilterableStatement;
@@ -76,12 +83,13 @@ import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.TableRef;
@@ -90,6 +98,8 @@ import org.apache.phoenix.schema.stats.GuidePostsKey;
 import org.apache.phoenix.schema.stats.StatisticsUtil;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.PrefixByteCodec;
 import org.apache.phoenix.util.PrefixByteDecoder;
@@ -157,7 +167,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         return true;
     }
     
-    private static void initializeScan(QueryPlan plan, Integer perScanLimit, Integer offset, Scan scan) {
+    private static void initializeScan(QueryPlan plan, Integer perScanLimit, Integer offset, Scan scan) throws SQLException {
         StatementContext context = plan.getContext();
         TableRef tableRef = plan.getTableRef();
         PTable table = tableRef.getTable();
@@ -208,7 +218,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                             // Project empty key value unless the column family containing it has
                             // been projected in its entirety.
                             if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) {
-                                scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES);
+                                scan.addColumn(ecf, EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst());
                             }
                         }
                     }
@@ -226,7 +236,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
             if(offset!=null){
                 ScanUtil.addOffsetAttribute(scan, offset);
             }
-
             int cols = plan.getGroupBy().getOrderPreservingColumnCount();
             if (cols > 0 && keyOnlyFilter &&
                 !plan.getStatement().getHint().hasHint(HintNode.Hint.RANGE_SCAN) &&
@@ -241,13 +250,93 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                     ScanUtil.andFilterAtEnd(scan, new PageFilter(plan.getLimit()));
                 }
             }
-
+            scan.setAttribute(BaseScannerRegionObserver.QUALIFIER_ENCODING_SCHEME, new byte[]{table.getEncodingScheme().getSerializedMetadataValue()});
+            scan.setAttribute(BaseScannerRegionObserver.IMMUTABLE_STORAGE_ENCODING_SCHEME, new byte[]{table.getImmutableStorageScheme().getSerializedMetadataValue()});
+            // we use this flag on the server side to determine which value column qualifier to use in the key value we return from server.
+            scan.setAttribute(BaseScannerRegionObserver.USE_NEW_VALUE_COLUMN_QUALIFIER, Bytes.toBytes(true));
+            // When analyzing the table, there is no look up for key values being done.
+            // So there is no point setting the range.
+            if (EncodedColumnsUtil.setQualifierRanges(table) && !ScanUtil.isAnalyzeTable(scan)) {
+                Pair<Integer, Integer> range = getEncodedQualifierRange(scan, context);
+                if (range != null) {
+                    scan.setAttribute(BaseScannerRegionObserver.MIN_QUALIFIER, Bytes.toBytes(range.getFirst()));
+                    scan.setAttribute(BaseScannerRegionObserver.MAX_QUALIFIER, Bytes.toBytes(range.getSecond()));
+                    ScanUtil.setQualifierRangesOnFilter(scan, range);
+                }
+            }
             if (optimizeProjection) {
                 optimizeProjection(context, scan, table, statement);
             }
         }
     }
+    
+    private static Pair<Integer, Integer> getEncodedQualifierRange(Scan scan, StatementContext context)
+            throws SQLException {
+        PTable table = context.getCurrentTable().getTable();
+        QualifierEncodingScheme encodingScheme = table.getEncodingScheme();
+        checkArgument(encodingScheme != QualifierEncodingScheme.NON_ENCODED_QUALIFIERS,
+            "Method should only be used for tables using encoded column names");
+        Pair<Integer, Integer> minMaxQualifiers = new Pair<>();
+        for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) {
+            byte[] cq = whereCol.getSecond();
+            if (cq != null) {
+                int qualifier = table.getEncodingScheme().decode(cq);
+                determineQualifierRange(qualifier, minMaxQualifiers);
+            }
+        }
+        Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
+
+        Map<String, Pair<Integer, Integer>> qualifierRanges = EncodedColumnsUtil.getFamilyQualifierRanges(table);
+        for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
+            if (entry.getValue() != null) {
+                for (byte[] cq : entry.getValue()) {
+                    if (cq != null) {
+                        int qualifier = table.getEncodingScheme().decode(cq);
+                        determineQualifierRange(qualifier, minMaxQualifiers);
+                    }
+                }
+            } else {
+                /*
+                 * All the columns of the column family are being projected. So we will need to
+                 * consider all the columns in the column family to determine the min-max range.
+                 */
+                String family = Bytes.toString(entry.getKey());
+                if (table.getType() == INDEX && table.getIndexType() == LOCAL && !IndexUtil.isLocalIndexFamily(family)) {
+                    //TODO: samarth confirm with James why do we need this hack here :(
+                    family = IndexUtil.getLocalIndexColumnFamily(family);
+                }
+                Pair<Integer, Integer> range = qualifierRanges.get(family);
+                if (range != null) {
+                    determineQualifierRange(range.getFirst(), minMaxQualifiers);
+                    determineQualifierRange(range.getSecond(), minMaxQualifiers);
+                }
+            }
+        }
+        if (minMaxQualifiers.getFirst() == null) {
+            return null;
+        }
+        return minMaxQualifiers;
+    }
 
+    /**
+     * 
+     * @param cq
+     * @param minMaxQualifiers
+     * @return true if the empty column was projected
+     */
+    private static void determineQualifierRange(Integer qualifier, Pair<Integer, Integer> minMaxQualifiers) {
+        if (minMaxQualifiers.getFirst() == null) {
+            minMaxQualifiers.setFirst(qualifier);
+            minMaxQualifiers.setSecond(qualifier);
+        } else {
+            if (minMaxQualifiers.getFirst() > qualifier) {
+                minMaxQualifiers.setFirst(qualifier);
+            } else if (minMaxQualifiers.getSecond() < qualifier) {
+                minMaxQualifiers.setSecond(qualifier);
+            }
+        }
+    }
+    
     private static void optimizeProjection(StatementContext context, Scan scan, PTable table, FilterableStatement statement) {
         Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
         // columnsTracker contain cf -> qualifiers which should get returned.
@@ -255,6 +344,9 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                 new TreeMap<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>>();
         Set<byte[]> conditionOnlyCfs = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
         int referencedCfCount = familyMap.size();
+        QualifierEncodingScheme encodingScheme = table.getEncodingScheme();
+        ImmutableStorageScheme storageScheme = table.getImmutableStorageScheme();
+        BitSet trackedColumnsBitset = isPossibleToUseEncodedCQFilter(encodingScheme, storageScheme) && !hasDynamicColumns(table) ? new BitSet(10) : null;
         boolean filteredColumnNotInProjection = false;
         for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) {
             byte[] filteredFamily = whereCol.getFirst();
@@ -295,6 +387,10 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                 cols = new TreeSet<ImmutableBytesPtr>();
                 for (byte[] q : qs) {
                     cols.add(new ImmutableBytesPtr(q));
+                    if (trackedColumnsBitset != null) {
+                        int qualifier = encodingScheme.decode(q);
+                        trackedColumnsBitset.set(qualifier);
+                    }
                 }
             }
             columnsTracker.put(cf, cols);
@@ -343,8 +439,9 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
             // in the scan in this case. We still want the other optimization that causes
             // the ExplicitColumnTracker not to be used, though.
             if (!statement.isAggregate() && filteredColumnNotInProjection) {
-                ScanUtil.andFilterAtEnd(scan, new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table),
-                        columnsTracker, conditionOnlyCfs));
+                ScanUtil.andFilterAtEnd(scan, 
+                        trackedColumnsBitset != null ? new EncodedQualifiersColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table), trackedColumnsBitset, conditionOnlyCfs, table.getEncodingScheme()) : new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table),
+                        columnsTracker, conditionOnlyCfs, EncodedColumnsUtil.usesEncodedColumnNames(table.getEncodingScheme())));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
index 3293f65..1e5f09e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
@@ -49,7 +49,7 @@ abstract public class LookAheadResultIterator implements PeekingResultIterator {
         };
     }
     
-    private final static Tuple UNINITIALIZED = new ResultTuple();
+    private final static Tuple UNINITIALIZED = ResultTuple.EMPTY_TUPLE;
     private Tuple next = UNINITIALIZED;
     
     abstract protected Tuple advance() throws SQLException;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
index 8ada952..135ab26 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
@@ -180,6 +180,7 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
             return this.index;
         }
         
+        @Override
         public int size() {
             if (flushBuffer)
                 return flushedCount;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
index da75bb7..5afdfea 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.OrderByExpression;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.SizedUtil;
 
 import com.google.common.base.Function;
@@ -264,7 +265,7 @@ public class OrderedResultIterator implements PeekingResultIterator {
             }
             this.byteSize = queueEntries.getByteSize();
         } catch (IOException e) {
-            throw new SQLException("", e);
+            ServerUtil.createIOException(e.getMessage(), e);
         } finally {
             delegate.close();
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
index 52fbe9c..a5a40e2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
@@ -24,16 +24,27 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
+import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.ServerUtil;
 
 
 public class RegionScannerResultIterator extends BaseResultIterator {
     private final RegionScanner scanner;
-
-    public RegionScannerResultIterator(RegionScanner scanner) {
+    private final Pair<Integer, Integer> minMaxQualifiers;
+    private final boolean useQualifierAsIndex;
+    private final QualifierEncodingScheme encodingScheme;
+    
+    public RegionScannerResultIterator(RegionScanner scanner, Pair<Integer, Integer> minMaxQualifiers, QualifierEncodingScheme encodingScheme) {
         this.scanner = scanner;
+        this.useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers);
+        this.minMaxQualifiers = minMaxQualifiers;
+        this.encodingScheme = encodingScheme;
     }
 
     @Override
@@ -43,7 +54,7 @@ public class RegionScannerResultIterator extends BaseResultIterator {
         synchronized (scanner) {
             try {
                 // TODO: size
-                List<Cell> results = new ArrayList<Cell>();
+                List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) :  new ArrayList<Cell>();
                 // Results are potentially returned even when the return value of s.next is false
                 // since this is an indication of whether or not there are more values after the
                 // ones returned
@@ -54,7 +65,7 @@ public class RegionScannerResultIterator extends BaseResultIterator {
                 }
                 // We instantiate a new tuple because in all cases currently we hang on to it
                 // (i.e. to compute and hold onto the TopN).
-                MultiKeyValueTuple tuple = new MultiKeyValueTuple();
+                Tuple tuple = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
                 tuple.setKeyValues(results);
                 return tuple;
             } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 5142b57..0e62164 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -205,7 +205,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
     public static final byte[] BASE_COLUMN_COUNT_BYTES = Bytes.toBytes(BASE_COLUMN_COUNT);
     public static final String IS_ROW_TIMESTAMP = "IS_ROW_TIMESTAMP";
     public static final byte[] IS_ROW_TIMESTAMP_BYTES = Bytes.toBytes(IS_ROW_TIMESTAMP);
-
+    
     public static final String TABLE_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY;
     public static final byte[] TABLE_FAMILY_BYTES = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
 
@@ -322,6 +322,15 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
     /** Version below which we fall back on the generic KeyValueBuilder */
     public static final int CLIENT_KEY_VALUE_BUILDER_THRESHOLD = VersionUtil.encodeVersion("0", "94", "14");
     
+    public static final String IMMUTABLE_STORAGE_SCHEME = "IMMUTABLE_STORAGE_SCHEME";
+    public static final byte[] STORAGE_SCHEME_BYTES = Bytes.toBytes(IMMUTABLE_STORAGE_SCHEME);
+    public static final String ENCODING_SCHEME = "ENCODING_SCHEME";
+    public static final byte[] ENCODING_SCHEME_BYTES = Bytes.toBytes(ENCODING_SCHEME);
+    public static final String COLUMN_QUALIFIER = "COLUMN_QUALIFIER";
+    public static final byte[] COLUMN_QUALIFIER_BYTES = Bytes.toBytes(COLUMN_QUALIFIER);
+    public static final String COLUMN_QUALIFIER_COUNTER = "QUALIFIER_COUNTER";
+    public static final byte[] COLUMN_QUALIFIER_COUNTER_BYTES = Bytes.toBytes(COLUMN_QUALIFIER_COUNTER);
+
     PhoenixDatabaseMetaData(PhoenixConnection connection) throws SQLException {
         this.emptyResultSet = new PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, new StatementContext(new PhoenixStatement(connection), false));
         this.connection = connection;
@@ -595,9 +604,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
                 newCells.addAll(cells);
                 newCells.add(kv);
                 Collections.sort(newCells, KeyValue.COMPARATOR);
-                resultTuple.setResult(Result.create(newCells));
+                tuple = new ResultTuple(Result.create(newCells));
             }
-
             return tuple;
         }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
index 47c17ae..3ca48a1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
@@ -107,7 +107,7 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable {
     private final static String STRING_FALSE = "0";
     private final static BigDecimal BIG_DECIMAL_FALSE = BigDecimal.valueOf(0);
     private final static Integer INTEGER_FALSE = Integer.valueOf(0);
-    private final static Tuple BEFORE_FIRST = new ResultTuple();
+    private final static Tuple BEFORE_FIRST = ResultTuple.EMPTY_TUPLE;
 
     private final ResultIterator scanner;
     private final RowProjector rowProjector;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
index 3072736..a8ddd62 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
@@ -56,7 +56,7 @@ public class HashCacheFactory implements ServerCacheFactory {
     }
 
     @Override
-    public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk) throws SQLException {
+    public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk, boolean useProtoForIndexMaintainer) throws SQLException {
         try {
             // This reads the uncompressed length from the front of the compressed input
             int uncompressedLen = Snappy.getUncompressedLength(cachePtr.get(), cachePtr.getOffset());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
index b12326a..278489d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
@@ -49,6 +49,7 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -208,7 +209,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
     not care about it
      */
     private void initColumnIndexes() throws SQLException {
-        columnIndexes = new TreeMap(Bytes.BYTES_COMPARATOR);
+        columnIndexes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
         int columnIndex = 0;
         for(int index = 0; index < logicalNames.size(); index++) {
             PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index));
@@ -216,18 +217,22 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
             for (int i = 0; i < cls.size(); i++) {
                 PColumn c = cls.get(i);
                 byte[] family = new byte[0];
-                if (c.getFamilyName() != null)  // Skip PK column
+                byte[] cq;
+                if (!SchemaUtil.isPKColumn(c)) {
                     family = c.getFamilyName().getBytes();
-                byte[] name = c.getName().getBytes();
-                byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name);
+                    cq = c.getColumnQualifierBytes();
+                } else {
+                    cq = c.getName().getBytes();
+                }
+                byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq);
                 if (!columnIndexes.containsKey(cfn)) {
                     columnIndexes.put(cfn, new Integer(columnIndex));
                     columnIndex++;
                 }
             }
             byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table);
-            byte[] cfn = Bytes.add(emptyColumnFamily, QueryConstants.NAMESPACE_SEPARATOR_BYTES,
-                    QueryConstants.EMPTY_COLUMN_BYTES);
+            byte[] emptyKeyValue = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
+            byte[] cfn = Bytes.add(emptyColumnFamily, QueryConstants.NAMESPACE_SEPARATOR_BYTES, emptyKeyValue);
             columnIndexes.put(cfn, new Integer(columnIndex));
             columnIndex++;
         }
@@ -243,9 +248,9 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
     private int findIndex(Cell cell) throws IOException {
         byte[] familyName = Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(),
                 cell.getFamilyLength());
-        byte[] name = Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(),
+        byte[] cq = Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(),
                 cell.getQualifierLength());
-        byte[] cfn = Bytes.add(familyName, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name);
+        byte[] cfn = Bytes.add(familyName, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq);
         if(columnIndexes.containsKey(cfn)) {
             return columnIndexes.get(cfn);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
index 15d6d2f..c529afe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
@@ -44,6 +44,7 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.Closeables;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -89,7 +90,7 @@ public class FormatToKeyValueReducer
     }
 
     private void initColumnsMap(PhoenixConnection conn) throws SQLException {
-        Map<byte[], Integer> indexMap = new TreeMap(Bytes.BYTES_COMPARATOR);
+        Map<byte[], Integer> indexMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
         columnIndexes = new HashMap<>();
         int columnIndex = 0;
         for (int index = 0; index < logicalNames.size(); index++) {
@@ -98,12 +99,16 @@ public class FormatToKeyValueReducer
             for (int i = 0; i < cls.size(); i++) {
                 PColumn c = cls.get(i);
                 byte[] family = new byte[0];
-                if (c.getFamilyName() != null) {
+                byte[] cq;
+                if (!SchemaUtil.isPKColumn(c)) {
                     family = c.getFamilyName().getBytes();
+                    cq = c.getColumnQualifierBytes();
+                } else {
+                    // TODO: samarth verify if this is the right thing to do here.
+                    cq = c.getName().getBytes();
                 }
-                byte[] name = c.getName().getBytes();
-                byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name);
-                Pair<byte[], byte[]> pair = new Pair(family, name);
+                byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq);
+                Pair<byte[], byte[]> pair = new Pair<>(family, cq);
                 if (!indexMap.containsKey(cfn)) {
                     indexMap.put(cfn, new Integer(columnIndex));
                     columnIndexes.put(new Integer(columnIndex), pair);
@@ -111,8 +116,8 @@ public class FormatToKeyValueReducer
                 }
             }
             byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table);
-            Pair<byte[], byte[]> pair = new Pair(emptyColumnFamily, QueryConstants
-                    .EMPTY_COLUMN_BYTES);
+            byte[] emptyKeyValue = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
+            Pair<byte[], byte[]> pair = new Pair<>(emptyColumnFamily, emptyKeyValue);
             columnIndexes.put(new Integer(columnIndex), pair);
             columnIndex++;
         }
@@ -123,18 +128,17 @@ public class FormatToKeyValueReducer
                           Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue>.Context context)
             throws IOException, InterruptedException {
         TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
-        ImmutableBytesWritable rowKey = key.getRowkey();
         for (ImmutableBytesWritable aggregatedArray : values) {
             DataInputStream input = new DataInputStream(new ByteArrayInputStream(aggregatedArray.get()));
             while (input.available() != 0) {
                 byte type = input.readByte();
                 int index = WritableUtils.readVInt(input);
                 ImmutableBytesWritable family;
-                ImmutableBytesWritable name;
+                ImmutableBytesWritable cq;
                 ImmutableBytesWritable value = QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR;
                 Pair<byte[], byte[]> pair = columnIndexes.get(index);
                 family = new ImmutableBytesWritable(pair.getFirst());
-                name = new ImmutableBytesWritable(pair.getSecond());
+                cq = new ImmutableBytesWritable(pair.getSecond());
                 int len = WritableUtils.readVInt(input);
                 if (len > 0) {
                     byte[] array = new byte[len];
@@ -145,10 +149,10 @@ public class FormatToKeyValueReducer
                 KeyValue.Type kvType = KeyValue.Type.codeToType(type);
                 switch (kvType) {
                     case Put: // not null value
-                        kv = builder.buildPut(key.getRowkey(), family, name, value);
+                        kv = builder.buildPut(key.getRowkey(), family, cq, value);
                         break;
                     case DeleteColumn: // null value
-                        kv = builder.buildDeleteColumns(key.getRowkey(), family, name);
+                        kv = builder.buildDeleteColumns(key.getRowkey(), family, cq);
                         break;
                     default:
                         throw new IOException("Unsupported KeyValue type " + kvType);
@@ -164,4 +168,4 @@ public class FormatToKeyValueReducer
             if (++index % 100 == 0) context.setStatus("Wrote " + index);
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
index 47a38a7..54dc748 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
@@ -113,7 +113,7 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr
                     if (put == null) {
                         put = new Put(CellUtil.cloneRow(cell));
                         put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                        put.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+                        put.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
                         put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES);
                         mutations.add(put);
                     }
@@ -122,7 +122,7 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr
                     if (del == null) {
                         del = new Delete(CellUtil.cloneRow(cell));
                         del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                        del.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+                        del.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
                         del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES);
                         mutations.add(del);
                     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 70c0575..2329432 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -23,11 +23,34 @@ import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERS
 import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION;
 import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER;
 import static org.apache.phoenix.coprocessor.MetaDataProtocol.getVersion;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HCONNECTIONS_COUNTER;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_SERVICES_COUNTER;
+import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_ENABLED;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE;
@@ -41,6 +64,7 @@ import java.lang.ref.WeakReference;
 import java.sql.PreparedStatement;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -160,11 +184,13 @@ import org.apache.phoenix.schema.ColumnAlreadyExistsException;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.EmptySequenceCacheException;
 import org.apache.phoenix.schema.FunctionNotFoundException;
+import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.MetaDataSplitPolicy;
 import org.apache.phoenix.schema.NewerSchemaAlreadyExistsException;
 import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnFamily;
+import org.apache.phoenix.schema.PColumnImpl;
 import org.apache.phoenix.schema.PMetaData;
 import org.apache.phoenix.schema.PMetaDataImpl;
 import org.apache.phoenix.schema.PName;
@@ -178,6 +204,7 @@ import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.Sequence;
 import org.apache.phoenix.schema.SequenceAllocation;
 import org.apache.phoenix.schema.SequenceKey;
+import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableProperty;
@@ -187,7 +214,9 @@ import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PTinyint;
 import org.apache.phoenix.schema.types.PUnsignedTinyint;
+import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
@@ -588,7 +617,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             latestMetaDataLock.notifyAll();
         }
     }
-
     @Override
     public void updateResolvedTimestamp(PTable table, long resolvedTime) throws SQLException {
         synchronized (latestMetaDataLock) {
@@ -2708,6 +2736,31 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                             MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0);
                     clearCache();
                 }
+                if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0) {
+                    metaConnection = addColumnQualifierColumn(metaConnection, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 - 3);
+                    metaConnection = addColumnsIfNotExists(
+                            metaConnection,
+                            PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                            MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 - 2,
+                            PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME + " "
+                                    + PTinyint.INSTANCE.getSqlTypeName());
+                    metaConnection = addColumnsIfNotExists(
+                            metaConnection,
+                            PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                            MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 - 1,
+                            PhoenixDatabaseMetaData.ENCODING_SCHEME + " "
+                                    + PTinyint.INSTANCE.getSqlTypeName());
+                    metaConnection = addColumnsIfNotExists(
+                            metaConnection,
+                            PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+                            MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0,
+                            PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER + " "
+                                    + PInteger.INSTANCE.getSqlTypeName());
+                    ConnectionQueryServicesImpl.this.removeTable(null,
+                            PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null,
+                            MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0);
+                    clearCache();
+                }
             }
 
 
@@ -2844,6 +2897,84 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             }
         }
     }
+    
+    // Special method for adding the column qualifier column for 4.10. 
+    private PhoenixConnection addColumnQualifierColumn(PhoenixConnection oldMetaConnection, Long timestamp) throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(oldMetaConnection.getClientInfo());
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timestamp));
+        // Cannot go through DriverManager or you end up in an infinite loop because it'll call init again
+        PhoenixConnection metaConnection = new PhoenixConnection(oldMetaConnection, this, props);
+        PTable sysCatalogPTable = metaConnection.getTable(new PTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME));
+        int numColumns = sysCatalogPTable.getColumns().size();
+        try (PreparedStatement mutateTable = metaConnection.prepareStatement(MetaDataClient.MUTATE_TABLE)) {
+            mutateTable.setString(1, null);
+            mutateTable.setString(2, SYSTEM_CATALOG_SCHEMA);
+            mutateTable.setString(3, SYSTEM_CATALOG_TABLE);
+            mutateTable.setString(4, PTableType.SYSTEM.getSerializedValue());
+            mutateTable.setLong(5, sysCatalogPTable.getSequenceNumber() + 1);
+            mutateTable.setInt(6, numColumns + 1);
+            mutateTable.execute();
+        }
+        List<Mutation> tableMetadata = new ArrayList<>();
+        tableMetadata.addAll(metaConnection.getMutationState().toMutations(metaConnection.getSCN()).next().getSecond());
+        metaConnection.rollback();
+        PColumn column = new PColumnImpl(PNameFactory.newName("COLUMN_QUALIFIER"),
+                PNameFactory.newName(DEFAULT_COLUMN_FAMILY_NAME), PVarbinary.INSTANCE, null, null, true, numColumns,
+                SortOrder.ASC, null, null, false, null, false, false, 
+                Bytes.toBytes("COLUMN_QUALIFIER"));
+        String upsertColumnMetadata = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
+                TENANT_ID + "," +
+                TABLE_SCHEM + "," +
+                TABLE_NAME + "," +
+                COLUMN_NAME + "," +
+                COLUMN_FAMILY + "," +
+                DATA_TYPE + "," +
+                NULLABLE + "," +
+                COLUMN_SIZE + "," +
+                DECIMAL_DIGITS + "," +
+                ORDINAL_POSITION + "," +
+                SORT_ORDER + "," +
+                DATA_TABLE_NAME + "," +
+                ARRAY_SIZE + "," +
+                VIEW_CONSTANT + "," +
+                IS_VIEW_REFERENCED + "," +
+                PK_NAME + "," +
+                KEY_SEQ + "," +
+                COLUMN_DEF + "," +
+                IS_ROW_TIMESTAMP +
+                ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+        try (PreparedStatement colUpsert = metaConnection.prepareStatement(upsertColumnMetadata)) {
+            colUpsert.setString(1, null);
+            colUpsert.setString(2, SYSTEM_CATALOG_SCHEMA);
+            colUpsert.setString(3, SYSTEM_CATALOG_TABLE);
+            colUpsert.setString(4, "COLUMN_QUALIFIER");
+            colUpsert.setString(5, DEFAULT_COLUMN_FAMILY);
+            colUpsert.setInt(6, column.getDataType().getSqlType());
+            colUpsert.setInt(7, ResultSetMetaData.columnNullable);
+            colUpsert.setNull(8, Types.INTEGER);
+            colUpsert.setNull(9, Types.INTEGER);
+            colUpsert.setInt(10, sysCatalogPTable.getBucketNum() != null ? numColumns : (numColumns + 1));
+            colUpsert.setInt(11, SortOrder.ASC.getSystemValue());
+            colUpsert.setString(12, null);
+            colUpsert.setNull(13, Types.INTEGER);
+            colUpsert.setBytes(14, null);
+            colUpsert.setBoolean(15, false);
+            colUpsert.setString(16, sysCatalogPTable.getPKName() == null ? null : sysCatalogPTable.getPKName().getString());
+            colUpsert.setNull(17, Types.SMALLINT);
+            colUpsert.setNull(18, Types.VARCHAR);
+            colUpsert.setBoolean(19, false);
+            colUpsert.execute();
+        }
+        tableMetadata.addAll(metaConnection.getMutationState().toMutations(metaConnection.getSCN()).next().getSecond());
+        metaConnection.rollback();
+        metaConnection.getQueryServices().addColumn(tableMetadata, sysCatalogPTable, Collections.<String,List<Pair<String,Object>>>emptyMap(), Collections.<String>emptySet(), Lists.newArrayList(column));
+        metaConnection.removeTable(null, SYSTEM_CATALOG_NAME, null, timestamp);
+        ConnectionQueryServicesImpl.this.removeTable(null,
+                SYSTEM_CATALOG_NAME, null,
+                timestamp);
+        clearCache();
+        return metaConnection;
+    }
 
     private void createSnapshot(String snapshotName, String tableName)
             throws SQLException {
@@ -4137,4 +4268,4 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     public Configuration getConfiguration() {
         return config;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 7c78083..47ef954 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -93,7 +93,6 @@ import org.apache.tephra.inmemory.InMemoryTxSystemClient;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
-
 /**
  *
  * Implementation of ConnectionQueryServices used in testing where no connection to

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 8e2dc1a..6f105f1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -31,6 +31,8 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CURRENT_VALUE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CYCLE_FLAG;
@@ -40,11 +42,13 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POST_KEY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INCREMENT_BY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE;
@@ -117,8 +121,8 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.schema.MetaDataSplitPolicy;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.SortOrder;
-import org.apache.phoenix.util.ByteUtil;
 
 
 /**
@@ -149,23 +153,30 @@ public interface QueryConstants {
     public final static byte[] OFFSET_ROW_KEY_BYTES = Bytes.toBytes(OFFSET_ROW_KEY);
     public final static ImmutableBytesPtr OFFSET_ROW_KEY_PTR = new ImmutableBytesPtr(OFFSET_ROW_KEY_BYTES);
 
-    public final static PName SINGLE_COLUMN_NAME = PNameFactory.newNormalizedName("s");
-    public final static PName SINGLE_COLUMN_FAMILY_NAME = PNameFactory.newNormalizedName("s");
-    public final static byte[] SINGLE_COLUMN = SINGLE_COLUMN_NAME.getBytes();
-    public final static byte[] SINGLE_COLUMN_FAMILY = SINGLE_COLUMN_FAMILY_NAME.getBytes();
-
     public static final long AGG_TIMESTAMP = HConstants.LATEST_TIMESTAMP;
     /**
      * Key used for a single row aggregation where there is no group by
      */
     public final static byte[] UNGROUPED_AGG_ROW_KEY = Bytes.toBytes("a");
-    public final static PName AGG_COLUMN_NAME = SINGLE_COLUMN_NAME;
-    public final static PName AGG_COLUMN_FAMILY_NAME = SINGLE_COLUMN_FAMILY_NAME;
-
-    public static final byte[] ARRAY_VALUE_COLUMN_FAMILY = Bytes.toBytes("a");
-    // Use empty byte array for column qualifier so as not to accidentally conflict with any other columns
-    public static final byte[] ARRAY_VALUE_COLUMN_QUALIFIER = ByteUtil.EMPTY_BYTE_ARRAY;
+    
+    /** BEGIN Set of reserved column qualifiers **/
+    
+    public static final String RESERVED_COLUMN_FAMILY = "_v";
+    public static final byte[] RESERVED_COLUMN_FAMILY_BYTES = Bytes.toBytes(RESERVED_COLUMN_FAMILY);
+    
+    public static final byte[] VALUE_COLUMN_FAMILY = RESERVED_COLUMN_FAMILY_BYTES;
+    public static final byte[] VALUE_COLUMN_QUALIFIER = QualifierEncodingScheme.FOUR_BYTE_QUALIFIERS.encode(1);
+    
+    public static final byte[] ARRAY_VALUE_COLUMN_FAMILY = RESERVED_COLUMN_FAMILY_BYTES;
+    public static final byte[] ARRAY_VALUE_COLUMN_QUALIFIER = QualifierEncodingScheme.FOUR_BYTE_QUALIFIERS.encode(2);
+    
+    public final static PName SINGLE_COLUMN_NAME = PNameFactory.newNormalizedName("s");
+    public final static PName SINGLE_COLUMN_FAMILY_NAME = PNameFactory.newNormalizedName("s");
+    public final static byte[] SINGLE_COLUMN = SINGLE_COLUMN_NAME.getBytes();
+    public final static byte[] SINGLE_COLUMN_FAMILY = SINGLE_COLUMN_FAMILY_NAME.getBytes();
 
+    /** END Set of reserved column qualifiers **/
+    
     public static final byte[] TRUE = new byte[] {1};
     
     /**
@@ -192,15 +203,25 @@ public interface QueryConstants {
     public static final byte[] EMPTY_COLUMN_BYTES = Bytes.toBytes(EMPTY_COLUMN_NAME);
     public static final ImmutableBytesPtr EMPTY_COLUMN_BYTES_PTR = new ImmutableBytesPtr(
             EMPTY_COLUMN_BYTES);
+    public static final Integer ENCODED_EMPTY_COLUMN_NAME = 0;
+    public static final byte[] ENCODED_EMPTY_COLUMN_BYTES = QualifierEncodingScheme.FOUR_BYTE_QUALIFIERS.encode(ENCODED_EMPTY_COLUMN_NAME);
     public final static String EMPTY_COLUMN_VALUE = "x";
     public final static byte[] EMPTY_COLUMN_VALUE_BYTES = Bytes.toBytes(EMPTY_COLUMN_VALUE);
     public static final ImmutableBytesPtr EMPTY_COLUMN_VALUE_BYTES_PTR = new ImmutableBytesPtr(
             EMPTY_COLUMN_VALUE_BYTES);
-
+    public static final String ENCODED_EMPTY_COLUMN_VALUE = EMPTY_COLUMN_VALUE;
+    public final static byte[] ENCODED_EMPTY_COLUMN_VALUE_BYTES = Bytes.toBytes(EMPTY_COLUMN_VALUE);
+    public static final ImmutableBytesPtr ENCODED_EMPTY_COLUMN_VALUE_BYTES_PTR = new ImmutableBytesPtr(
+            ENCODED_EMPTY_COLUMN_VALUE_BYTES);
     public static final String DEFAULT_COLUMN_FAMILY = "0";
     public static final byte[] DEFAULT_COLUMN_FAMILY_BYTES = Bytes.toBytes(DEFAULT_COLUMN_FAMILY);
     public static final ImmutableBytesPtr DEFAULT_COLUMN_FAMILY_BYTES_PTR = new ImmutableBytesPtr(
             DEFAULT_COLUMN_FAMILY_BYTES);
+    // column qualifier of the single key value used to store all columns for the COLUMNS_STORED_IN_SINGLE_CELL storage scheme
+    public static final String SINGLE_KEYVALUE_COLUMN_QUALIFIER = "1";
+    public final static byte[] SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES = Bytes.toBytes(SINGLE_KEYVALUE_COLUMN_QUALIFIER);
+    public static final ImmutableBytesPtr SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES_PTR = new ImmutableBytesPtr(
+            SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES);
 
     public static final String LOCAL_INDEX_COLUMN_FAMILY_PREFIX = "L#";
     public static final byte[] LOCAL_INDEX_COLUMN_FAMILY_PREFIX_BYTES = Bytes.toBytes(LOCAL_INDEX_COLUMN_FAMILY_PREFIX);
@@ -222,6 +243,12 @@ public interface QueryConstants {
     public static final int NANOS_IN_SECOND = BigDecimal.valueOf(Math.pow(10, 9)).intValue();
     public static final int DIVERGED_VIEW_BASE_COLUMN_COUNT = -100;
     public static final int BASE_TABLE_BASE_COLUMN_COUNT = -1;
+    
+    /**
+     * We mark counter values 0 to 10 as reserved. Value 0 is used by {@link #ENCODED_EMPTY_COLUMN_NAME}. Values 1-10
+     * are reserved for special column qualifiers returned by Phoenix co-processors.
+     */
+    public static final int ENCODED_CQ_COUNTER_INITIAL_VALUE = 11;
     public static final String CREATE_TABLE_METADATA =
             // Do not use IF NOT EXISTS as we sometimes catch the TableAlreadyExists
             // exception and add columns to the SYSTEM.TABLE dynamically.
@@ -289,6 +316,10 @@ public interface QueryConstants {
             AUTO_PARTITION_SEQ + " VARCHAR," +
             APPEND_ONLY_SCHEMA + " BOOLEAN," +
             GUIDE_POSTS_WIDTH + " BIGINT," +
+            COLUMN_QUALIFIER + " VARBINARY," +
+            IMMUTABLE_STORAGE_SCHEME + " TINYINT, " +
+            ENCODING_SCHEME + " TINYINT, " +
+            COLUMN_QUALIFIER_COUNTER + " INTEGER, " +
             "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + ","
             + TABLE_SCHEM + "," + TABLE_NAME + "," + COLUMN_NAME + "," + COLUMN_FAMILY + "))\n" +
             HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +
@@ -363,5 +394,5 @@ public interface QueryConstants {
     public static final byte[] OFFSET_FAMILY = "f_offset".getBytes();
     public static final byte[] OFFSET_COLUMN = "c_offset".getBytes();
     public static final String LAST_SCAN = "LAST_SCAN";
-
+    public static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 2035de8..8f0b06e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -240,6 +240,10 @@ public interface QueryServices extends SQLCloseable {
     public static final String CLIENT_CONNECTION_CACHE_MAX_SIZE = "phoenix.client.connection.cache.max.size";
     public static final String CLIENT_CONNECTION_CACHE_MAX_DURATION_MILLISECONDS =
         "phoenix.client.connection.max.duration";
+    public static final String DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB  = "phoenix.default.column.encoded.bytes.attrib";
+    public static final String DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB  = "phoenix.default.immutable.storage.scheme";
+    public static final String DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME_ATTRIB  = "phoenix.default.multitenant.immutable.storage.scheme";
+
     /**
      * Get executor service used for parallel scans
      */

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index de0796f..15ea956 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -93,6 +93,8 @@ import org.apache.hadoop.hbase.client.Consistency;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTableRefFactory;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.DateUtil;
@@ -251,7 +253,6 @@ public class QueryServicesOptions {
     public static final long DEFAULT_QUERY_SERVER_UGI_CACHE_MAX_SIZE = 1000L;
     public static final int DEFAULT_QUERY_SERVER_UGI_CACHE_INITIAL_SIZE = 100;
     public static final int DEFAULT_QUERY_SERVER_UGI_CACHE_CONCURRENCY = 10;
-
     public static final boolean DEFAULT_RENEW_LEASE_ENABLED = true;
     public static final int DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS =
             DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD / 2;
@@ -267,6 +268,9 @@ public class QueryServicesOptions {
     public static final boolean DEFAULT_AUTO_UPGRADE_ENABLED = true;
     public static final int DEFAULT_CLIENT_CONNECTION_CACHE_MAX_SIZE = 100;
     public static final int DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION = 86400000;
+    public static final int DEFAULT_COLUMN_ENCODED_BYTES = QualifierEncodingScheme.TWO_BYTE_QUALIFIERS.getSerializedMetadataValue();
+    public static final String DEFAULT_IMMUTABLE_STORAGE_SCHEME = ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS.toString();
+    public static final String DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME = ImmutableStorageScheme.ONE_CELL_PER_COLUMN.toString();
 
     @SuppressWarnings("serial")
     public static final Set<String> DEFAULT_QUERY_SERVER_SKIP_WORDS = new HashSet<String>() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
index 1d772b4..c73b860 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
@@ -17,6 +17,10 @@
  */
 package org.apache.phoenix.schema;
 
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Arrays;
+
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.http.annotation.Immutable;
 import org.apache.phoenix.compile.ExpressionCompiler;
@@ -25,19 +29,17 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.expression.ProjectedColumnExpression;
 import org.apache.phoenix.expression.RowKeyColumnExpression;
+import org.apache.phoenix.expression.SingleCellColumnExpression;
 import org.apache.phoenix.expression.function.DefaultValueExpression;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.SQLParser;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.util.ExpressionUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SchemaUtil;
 
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.util.Arrays;
-
 
 /**
  * 
@@ -59,7 +61,7 @@ public class ColumnRef {
     }
 
     public ColumnRef(TableRef tableRef, String familyName, String columnName) throws MetaDataEntityNotFoundException {
-        this(tableRef, tableRef.getTable().getColumnFamily(familyName).getColumn(columnName).getPosition());
+        this(tableRef, tableRef.getTable().getColumnFamily(familyName).getPColumnForColumnName(columnName).getPosition());
     }
 
     public ColumnRef(TableRef tableRef, int columnPosition) {
@@ -123,7 +125,8 @@ public class ColumnRef {
         	return new ProjectedColumnExpression(column, table, displayName);
         }
 
-        Expression expression = new KeyValueColumnExpression(column, displayName);
+        Expression expression = table.getImmutableStorageScheme() == ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS ? 
+        		new SingleCellColumnExpression(column, displayName, table.getEncodingScheme()) : new KeyValueColumnExpression(column, displayName);
 
         if (column.getExpressionStr() != null) {
             String url = PhoenixRuntime.JDBC_PROTOCOL
@@ -140,7 +143,6 @@ public class ColumnRef {
                 return new DefaultValueExpression(Arrays.asList(expression, defaultExpression));
             }
         }
-       
         return expression;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnValueDecoder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnValueDecoder.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnValueDecoder.java
new file mode 100644
index 0000000..5ae72d1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnValueDecoder.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+/**
+ * Interface to decode column values that are stored in a byte[] 
+ */
+public interface ColumnValueDecoder {
+    /**
+     * sets the ptr to the column value at the given index  
+     * @return false if the column value is absent (used to support DEFAULT expressions) or else true 
+     */
+    boolean decode(ImmutableBytesWritable ptr, int index);
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnValueEncoder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnValueEncoder.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnValueEncoder.java
new file mode 100644
index 0000000..5e930bd
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnValueEncoder.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
+
+
+/**
+ * Interface to encode column values into a serialized byte[] that will be stored in a single cell
+ * The last byte of the serialized byte[] should be the serialized value of the {@link ImmutableStorageScheme}
+ * that was used.
+ */
+public interface ColumnValueEncoder {
+    
+    /**
+     * append a column value to the array
+     */
+    void appendValue(byte[] bytes, int offset, int length);
+    
+    /**
+     * append a value that is not present to the array (used to support DEFAULT expressions)
+     */
+    void appendAbsentValue();
+    
+    /**
+     * @return the encoded byte[] that contains the serialized column values
+     */
+    byte[] encode();
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
index aca8219..5c9cc2e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
@@ -99,4 +99,8 @@ public class DelegateColumn extends DelegateDatum implements PColumn {
     public boolean equals(Object o) {
 	    return getDelegate().equals(o);
 	}
+    @Override
+    public byte[] getColumnQualifierBytes() {
+        return getDelegate().getColumnQualifierBytes();
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 7d39dfe..12258e1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -96,8 +96,8 @@ public class DelegateTable implements PTable {
     }
 
     @Override
-    public PColumn getColumn(String name) throws ColumnNotFoundException, AmbiguousColumnException {
-        return delegate.getColumn(name);
+    public PColumn getColumnForColumnName(String name) throws ColumnNotFoundException, AmbiguousColumnException {
+        return delegate.getColumnForColumnName(name);
     }
 
     @Override
@@ -290,4 +290,24 @@ public class DelegateTable implements PTable {
     public boolean equals(Object obj) {
         return delegate.equals(obj);
     }
+    
+    @Override
+    public ImmutableStorageScheme getImmutableStorageScheme() {
+        return delegate.getImmutableStorageScheme();
+    }
+
+    @Override
+    public PColumn getColumnForColumnQualifier(byte[] cf, byte[] cq) throws ColumnNotFoundException, AmbiguousColumnException {
+        return delegate.getColumnForColumnQualifier(cf, cq);
+    }
+
+    @Override
+    public EncodedCQCounter getEncodedCQCounter() {
+        return delegate.getEncodedCQCounter();
+    }
+
+    @Override
+    public QualifierEncodingScheme getEncodingScheme() {
+        return delegate.getEncodingScheme();
+    }
 }