You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2017/02/14 23:42:28 UTC

[39/50] [abbrv] phoenix git commit: PHOENIX-1598 Column encoding to save space and improve performance

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 237ed75..d3a3ca4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -17,7 +17,10 @@
  */
 package org.apache.phoenix.index;
 
+import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
@@ -29,9 +32,11 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.hadoop.hbase.Cell;
@@ -43,17 +48,24 @@ import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.compile.IndexExpressionCompiler;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.coprocessor.generated.ServerCachingProtos;
+import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ColumnInfo;
 import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ExpressionType;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.expression.SingleCellColumnExpression;
+import org.apache.phoenix.expression.SingleCellConstructorExpression;
 import org.apache.phoenix.expression.visitor.KeyValueExpressionVisitor;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
@@ -67,14 +79,17 @@ import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.parse.StatelessTraverseAllParseNodeVisitor;
 import org.apache.phoenix.parse.UDFParseNode;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.AmbiguousColumnException;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PDatum;
 import org.apache.phoenix.schema.PIndexState;
-import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SaltingUtil;
@@ -82,10 +97,12 @@ import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.ValueSchema;
 import org.apache.phoenix.schema.ValueSchema.Field;
+import org.apache.phoenix.schema.tuple.BaseTuple;
 import org.apache.phoenix.schema.tuple.ValueGetterTuple;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.BitSet;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.ExpressionUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
@@ -93,6 +110,7 @@ import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TrustedByteArrayOutputStream;
 import org.apache.tephra.TxConstants;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
@@ -105,10 +123,10 @@ import com.google.common.collect.Sets;
  * row and caches any covered columns. Client-side serializes into byte array using 
  * @link #serialize(PTable, ImmutableBytesWritable)}
  * and transmits to server-side through either the 
- * {@link org.apache.phoenix.index.PhoenixIndexCodec#INDEX_MD}
+ * {@link org.apache.phoenix.index.PhoenixIndexCodec#INDEX_PROTO_MD}
  * Mutation attribute or as a separate RPC call using 
  * {@link org.apache.phoenix.cache.ServerCacheClient})
- *
+ * 
  * 
  * @since 2.1.0
  */
@@ -116,8 +134,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
 
     private static final int EXPRESSION_NOT_PRESENT = -1;
     private static final int ESTIMATED_EXPRESSION_SIZE = 8;
-
-	public static IndexMaintainer create(PTable dataTable, PTable index, PhoenixConnection connection) {
+    
+    public static IndexMaintainer create(PTable dataTable, PTable index, PhoenixConnection connection) {
         if (dataTable.getType() == PTableType.INDEX || index.getType() != PTableType.INDEX || !dataTable.getIndexes().contains(index)) {
             throw new IllegalArgumentException();
         }
@@ -179,14 +197,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             }
         }
         int nIndexes = 0;
-        int estimatedSize = dataTable.getRowKeySchema().getEstimatedByteSize() + 2;
         while (indexesItr.hasNext()) {
             nIndexes++;
-            PTable index = indexesItr.next();
-            estimatedSize += index.getIndexMaintainer(dataTable, connection).getEstimatedByteSize();
+            indexesItr.next();
         }
-        TrustedByteArrayOutputStream stream = new TrustedByteArrayOutputStream(estimatedSize + 1);
-        DataOutput output = new DataOutputStream(stream);
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        DataOutputStream output = new DataOutputStream(stream);
         try {
             // Encode data table salting in sign of number of indexes
             WritableUtils.writeVInt(output, nIndexes * (dataTable.getBucketNum() == null ? 1 : -1));
@@ -196,15 +212,23 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                     dataTable.isImmutableRows() ? enabledLocalIndexIterator(indexes.iterator())
                             : nonDisabledIndexIterator(indexes.iterator());
             while (indexesItr.hasNext()) {
-                    indexesItr.next().getIndexMaintainer(dataTable, connection).write(output);
+                    org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer proto = IndexMaintainer.toProto(indexesItr.next().getIndexMaintainer(dataTable, connection));
+                    byte[] protoBytes = proto.toByteArray();
+                    WritableUtils.writeVInt(output, protoBytes.length);
+                    output.write(protoBytes);
             }
         } catch (IOException e) {
             throw new RuntimeException(e); // Impossible
         }
-        ptr.set(stream.getBuffer(), 0, stream.size());
+        ptr.set(stream.toByteArray(), 0, stream.size());
     }
     
-
+    /**
+     * For client-side to append serialized IndexMaintainers of keyValueIndexes
+     * @param dataTable data table
+     * @param indexMetaDataPtr bytes pointer to hold returned serialized value
+     * @param keyValueIndexes indexes to serialize
+     */
     public static void serializeAdditional(PTable table, ImmutableBytesWritable indexMetaDataPtr,
             List<PTable> keyValueIndexes, PhoenixConnection connection) {
         int nMutableIndexes = indexMetaDataPtr.getLength() == 0 ? 0 : ByteUtil.vintFromBytes(indexMetaDataPtr);
@@ -230,7 +254,10 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             }
             // Serialize mutable indexes afterwards
             for (PTable index : keyValueIndexes) {
-                index.getIndexMaintainer(table, connection).write(output);
+                IndexMaintainer maintainer = index.getIndexMaintainer(table, connection);
+                byte[] protoBytes = IndexMaintainer.toProto(maintainer).toByteArray();
+                WritableUtils.writeVInt(output, protoBytes.length);
+                output.write(protoBytes);
             }
         } catch (IOException e) {
             throw new RuntimeException(e); // Impossible
@@ -239,15 +266,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
     }
     
     public static List<IndexMaintainer> deserialize(ImmutableBytesWritable metaDataPtr,
-            KeyValueBuilder builder) {
-        return deserialize(metaDataPtr.get(), metaDataPtr.getOffset(), metaDataPtr.getLength());
+            KeyValueBuilder builder, boolean useProtoForIndexMaintainer) {
+        return deserialize(metaDataPtr.get(), metaDataPtr.getOffset(), metaDataPtr.getLength(), useProtoForIndexMaintainer);
     }
     
-    public static List<IndexMaintainer> deserialize(byte[] buf) {
-        return deserialize(buf, 0, buf.length);
+    public static List<IndexMaintainer> deserialize(byte[] buf, boolean useProtoForIndexMaintainer) {
+        return deserialize(buf, 0, buf.length, useProtoForIndexMaintainer);
     }
 
-    public static List<IndexMaintainer> deserialize(byte[] buf, int offset, int length) {
+    private static List<IndexMaintainer> deserialize(byte[] buf, int offset, int length, boolean useProtoForIndexMaintainer) {
         ByteArrayInputStream stream = new ByteArrayInputStream(buf, offset, length);
         DataInput input = new DataInputStream(stream);
         List<IndexMaintainer> maintainers = Collections.emptyList();
@@ -259,25 +286,31 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             rowKeySchema.readFields(input);
             maintainers = Lists.newArrayListWithExpectedSize(size);
             for (int i = 0; i < size; i++) {
-                IndexMaintainer maintainer = new IndexMaintainer(rowKeySchema, isDataTableSalted);
-                maintainer.readFields(input);
-                maintainers.add(maintainer);
+                if (useProtoForIndexMaintainer) {
+                  int protoSize = WritableUtils.readVInt(input);
+                  byte[] b = new byte[protoSize];
+                  input.readFully(b);
+                  org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer proto = ServerCachingProtos.IndexMaintainer.parseFrom(b);
+                  maintainers.add(IndexMaintainer.fromProto(proto, rowKeySchema, isDataTableSalted));
+                } else {
+                    IndexMaintainer maintainer = new IndexMaintainer(rowKeySchema, isDataTableSalted);
+                    maintainer.readFields(input);
+                    maintainers.add(maintainer);
+                }
             }
         } catch (IOException e) {
             throw new RuntimeException(e); // Impossible
         }
         return maintainers;
     }
-
+    
     private byte[] viewIndexId;
     private boolean isMultiTenant;
     // indexed expressions that are not present in the row key of the data table, the expression can also refer to a regular column
     private List<Expression> indexedExpressions;
     // columns required to evaluate all expressions in indexedExpressions (this does not include columns in the data row key)
     private Set<ColumnReference> indexedColumns;
-    private Set<ColumnReference> coveredColumns;
-    // Map used to cache column family of data table and the corresponding column family for the local index
-    private Map<ImmutableBytesPtr, ImmutableBytesWritable> dataTableLocalIndexFamilyMap;
+    
     // columns required to create index row i.e. indexedColumns + coveredColumns  (this does not include columns in the data row key)
     private Set<ColumnReference> allColumns;
     // TODO remove this in the next major release
@@ -291,12 +324,10 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
     private boolean indexWALDisabled;
     private boolean isLocalIndex;
     private boolean immutableRows;
-
     // Transient state
     private final boolean isDataTableSalted;
     private final RowKeySchema dataRowKeySchema;
     
-    private List<ImmutableBytesPtr> indexQualifiers;
     private int estimatedIndexRowKeyBytes;
     private int estimatedExpressionSize;
     private int[] dataPkPosition;
@@ -304,26 +335,48 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
     private ColumnReference dataEmptyKeyValueRef;
     private boolean rowKeyOrderOptimizable;
     
+    /**** START: New member variables added in 4.10 *****/ 
+    private QualifierEncodingScheme encodingScheme;
+    private ImmutableStorageScheme immutableStorageScheme;
+    /*
+     * Information for columns of data tables that are being indexed. The first part of the pair is column family name
+     * and second part is the column name. The reason we need to track this state is because for certain storage schemes
+     * like ImmutableStorageScheme#SINGLE_CELL_ARRAY_WITH_OFFSETS, the column for which we need to generate an index
+     * table put/delete is different from the columns that are indexed in the phoenix schema. This information helps us
+     * determine whether or not certain operations like DROP COLUMN should impact the index.
+     */
+    private Set<Pair<String, String>> indexedColumnsInfo;
+    /*
+     * Map of covered columns where a key is column reference for a column in the data table
+     * and value is column reference for corresponding column in the index table.
+     */
+    private Map<ColumnReference, ColumnReference> coveredColumnsMap;
+    /**** END: New member variables added in 4.10 *****/
+
     private IndexMaintainer(RowKeySchema dataRowKeySchema, boolean isDataTableSalted) {
         this.dataRowKeySchema = dataRowKeySchema;
         this.isDataTableSalted = isDataTableSalted;
     }
-
-    private IndexMaintainer(PTable dataTable, PTable index, PhoenixConnection connection) {
+    
+    private IndexMaintainer(final PTable dataTable, final PTable index, PhoenixConnection connection) {
         this(dataTable.getRowKeySchema(), dataTable.getBucketNum() != null);
         assert(dataTable.getType() == PTableType.SYSTEM || dataTable.getType() == PTableType.TABLE || dataTable.getType() == PTableType.VIEW);
         this.rowKeyOrderOptimizable = index.rowKeyOrderOptimizable();
         this.isMultiTenant = dataTable.isMultiTenant();
         this.viewIndexId = index.getViewIndexId() == null ? null : MetaDataUtil.getViewIndexIdDataType().toBytes(index.getViewIndexId());
         this.isLocalIndex = index.getIndexType() == IndexType.LOCAL;
-
+        this.encodingScheme = index.getEncodingScheme();
+        
+        // null check for b/w compatibility
+        this.encodingScheme = index.getEncodingScheme() == null ? QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : index.getEncodingScheme();
+        this.immutableStorageScheme = index.getImmutableStorageScheme() == null ? ImmutableStorageScheme.ONE_CELL_PER_COLUMN : index.getImmutableStorageScheme();
+        
         byte[] indexTableName = index.getPhysicalName().getBytes();
         // Use this for the nDataSaltBuckets as we need this for local indexes
         // TODO: persist nDataSaltBuckets separately, but maintain b/w compat.
         Integer nIndexSaltBuckets = isLocalIndex ? dataTable.getBucketNum() : index.getBucketNum();
         boolean indexWALDisabled = index.isWALDisabled();
         int indexPosOffset = (index.getBucketNum() == null ? 0 : 1) + (this.isMultiTenant ? 1 : 0) + (this.viewIndexId == null ? 0 : 1);
-//        int indexPosOffset = !isLocalIndex && nIndexSaltBuckets > 0 ? 1 : 0;
         int nIndexColumns = index.getColumns().size() - indexPosOffset;
         int nIndexPKColumns = index.getPKColumns().size() - indexPosOffset;
         // number of expressions that are indexed that are not present in the row key of the data table
@@ -334,7 +387,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             String dataFamilyName = IndexUtil.getDataColumnFamilyName(indexColumnName);
             String dataColumnName = IndexUtil.getDataColumnName(indexColumnName);
             try {
-                PColumn dataColumn = dataFamilyName.equals("") ? dataTable.getColumn(dataColumnName) : dataTable.getColumnFamily(dataFamilyName).getColumn(dataColumnName);
+                PColumn dataColumn = dataFamilyName.equals("") ? dataTable.getPColumnForColumnName(dataColumnName) : dataTable.getColumnFamily(dataFamilyName).getPColumnForColumnName(dataColumnName);
                 if (SchemaUtil.isPKColumn(dataColumn)) 
                     continue;
             } catch (ColumnNotFoundException e) {
@@ -366,8 +419,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         this.indexTableName = indexTableName;
         this.indexedColumnTypes = Lists.<PDataType>newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns);
         this.indexedExpressions = Lists.newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns);
-        this.coveredColumns = Sets.newLinkedHashSetWithExpectedSize(nIndexColumns-nIndexPKColumns);
-        this.dataTableLocalIndexFamilyMap = Maps.newHashMapWithExpectedSize(nIndexColumns-nIndexPKColumns);
+        this.coveredColumnsMap = Maps.newHashMapWithExpectedSize(nIndexColumns - nIndexPKColumns);
         this.nIndexSaltBuckets  = nIndexSaltBuckets == null ? 0 : nIndexSaltBuckets;
         this.dataEmptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(dataTable);
         this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(index);
@@ -397,6 +449,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             throw new RuntimeException(e); // Impossible
         }
         StatementContext context = new StatementContext(new PhoenixStatement(connection), resolver);
+        this.indexedColumnsInfo = Sets.newHashSetWithExpectedSize(nIndexColumns - nIndexPKColumns);
+        
         IndexExpressionCompiler expressionIndexCompiler = new IndexExpressionCompiler(context);
         for (int i = indexPosOffset; i < index.getPKColumns().size(); i++) {
             PColumn indexColumn = index.getPKColumns().get(i);
@@ -409,12 +463,13 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                 throw new RuntimeException(e); // Impossible
             }
             if ( expressionIndexCompiler.getColumnRef()!=null ) {
-            	// get the column of the data table that corresponds to this index column
+            	// get the column of the data column that corresponds to this index column
 	            PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString());
 	            boolean isPKColumn = SchemaUtil.isPKColumn(column);
 	            if (isPKColumn) {
 	                int dataPkPos = dataTable.getPKColumns().indexOf(column) - (dataTable.getBucketNum() == null ? 0 : 1) - (this.isMultiTenant ? 1 : 0);
 	                this.rowKeyMetaData.setIndexPkPosition(dataPkPos, indexPos);
+	                indexedColumnsInfo.add(new Pair<>((String)null, column.getName().getString()));
 	            } else {
 	                indexColByteSize += column.getDataType().isFixedWidth() ? SchemaUtil.getFixedByteSize(column) : ValueSchema.ESTIMATED_VARIABLE_LENGTH_SIZE;
 	                try {
@@ -424,6 +479,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
 	                        expression = CoerceExpression.create(expression, indexColumn.getDataType());
 	                    }
                         this.indexedExpressions.add(expression);
+                        indexedColumnsInfo.add(new Pair<>(column.getFamilyName().getString(), column.getName().getString()));
                     } catch (SQLException e) {
                         throw new RuntimeException(e); // Impossible
                     }
@@ -432,6 +488,45 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             else {
             	indexColByteSize += expression.getDataType().isFixedWidth() ? SchemaUtil.getFixedByteSize(expression) : ValueSchema.ESTIMATED_VARIABLE_LENGTH_SIZE;
                 this.indexedExpressions.add(expression);
+                KeyValueExpressionVisitor kvVisitor = new KeyValueExpressionVisitor() {
+                    @Override
+                    public Void visit(KeyValueColumnExpression colExpression) {
+                        return addDataColInfo(dataTable, colExpression);
+                    }
+
+                    @Override
+                    public Void visit(SingleCellColumnExpression expression) {
+                        return addDataColInfo(dataTable, expression);
+                    }
+
+                    private Void addDataColInfo(final PTable dataTable, Expression expression) {
+                        Preconditions.checkArgument(expression instanceof SingleCellColumnExpression
+                                || expression instanceof KeyValueColumnExpression);
+
+                        KeyValueColumnExpression colExpression = null;
+                        if (expression instanceof SingleCellColumnExpression) {
+                            colExpression =
+                                    ((SingleCellColumnExpression) expression).getKeyValueExpression();
+                        } else {
+                            colExpression = ((KeyValueColumnExpression) expression);
+                        }
+                        byte[] cf = colExpression.getColumnFamily();
+                        byte[] cq = colExpression.getColumnQualifier();
+                        try {
+                            PColumn dataColumn =
+                                    cf == null ? dataTable.getPColumnForColumnQualifier(null, cq)
+                                            : dataTable.getColumnFamily(cf)
+                                                    .getPColumnForColumnQualifier(cq);
+                            indexedColumnsInfo.add(new Pair<>(dataColumn.getFamilyName()
+                                    .getString(), dataColumn.getName().getString()));
+                        } catch (ColumnNotFoundException | ColumnFamilyNotFoundException
+                                | AmbiguousColumnException e) {
+                            throw new RuntimeException(e);
+                        }
+                        return null;
+                    }
+                };
+                expression.accept(kvVisitor);
             }
             // set the sort order of the expression correctly
             if (indexColumn.getSortOrder() == SortOrder.DESC) {
@@ -442,18 +537,17 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         for (int i = 0; i < index.getColumnFamilies().size(); i++) {
             PColumnFamily family = index.getColumnFamilies().get(i);
             for (PColumn indexColumn : family.getColumns()) {
-                PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString());
-                PName dataTableFamily = column.getFamilyName();
-                this.coveredColumns.add(new ColumnReference(dataTableFamily.getBytes(), column.getName().getBytes()));
-                if(isLocalIndex) {
-                    this.dataTableLocalIndexFamilyMap.put(new ImmutableBytesPtr(dataTableFamily.getBytes()), new ImmutableBytesWritable(Bytes.toBytes(IndexUtil.getLocalIndexColumnFamily(dataTableFamily.getString()))));
-                }
+                PColumn dataColumn = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString());
+                byte[] dataColumnCq = dataColumn.getColumnQualifierBytes();
+                byte[] indexColumnCq = indexColumn.getColumnQualifierBytes();
+                this.coveredColumnsMap.put(new ColumnReference(dataColumn.getFamilyName().getBytes(), dataColumnCq), 
+                        new ColumnReference(indexColumn.getFamilyName().getBytes(), indexColumnCq));
             }
         }
         this.estimatedIndexRowKeyBytes = estimateIndexRowKeyByteSize(indexColByteSize);
         initCachedState();
     }
-
+    
     public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey)  {
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         boolean prependRegionStartKey = isLocalIndex && regionStartKey != null;
@@ -854,37 +948,106 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         }
         return indexRowKeySchema;
     }
-
+    
     public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter valueGetter, ImmutableBytesWritable dataRowKeyPtr, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException {
+        byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey);
         Put put = null;
         // New row being inserted: add the empty key value
         if (valueGetter.getLatestValue(dataEmptyKeyValueRef) == null) {
-            byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey);
             put = new Put(indexRowKey);
             // add the keyvalue for the empty row
             put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey),
-                this.getEmptyKeyValueFamily(), QueryConstants.EMPTY_COLUMN_BYTES_PTR, ts,
+                this.getEmptyKeyValueFamily(), dataEmptyKeyValueRef.getQualifierWritable(), ts,
                 // set the value to the empty column name
-                QueryConstants.EMPTY_COLUMN_BYTES_PTR));
+                dataEmptyKeyValueRef.getQualifierWritable()));
             put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
         }
-        int i = 0;
-        for (ColumnReference ref : this.getCoveredColumns()) {
-            ImmutableBytesPtr cq = this.indexQualifiers.get(i++);
-            ImmutableBytesWritable value = valueGetter.getLatestValue(ref);
-            byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey);
-            ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey);
-            if (value != null) {
+        ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey);
+        if (immutableStorageScheme != ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
+            // map from index column family to list of pair of index column and data column (for covered columns)
+            Map<ImmutableBytesPtr, List<Pair<ColumnReference, ColumnReference>>> familyToColListMap = Maps.newHashMap();
+            for (ColumnReference ref : this.getCoveredColumns()) {
+                ColumnReference indexColRef = this.coveredColumnsMap.get(ref);
+                ImmutableBytesPtr cf = new ImmutableBytesPtr(indexColRef.getFamily());
+                if (!familyToColListMap.containsKey(cf)) {
+                    familyToColListMap.put(cf, Lists.<Pair<ColumnReference, ColumnReference>>newArrayList());
+                }
+                familyToColListMap.get(cf).add(Pair.newPair(indexColRef, ref));
+            }
+            // iterate over each column family and create a byte[] containing all the columns 
+            for (Entry<ImmutableBytesPtr, List<Pair<ColumnReference, ColumnReference>>> entry : familyToColListMap.entrySet()) {
+                byte[] columnFamily = entry.getKey().copyBytesIfNecessary();
+                List<Pair<ColumnReference, ColumnReference>> colRefPairs = entry.getValue();
+                int maxEncodedColumnQualifier = Integer.MIN_VALUE;
+                // find the max col qualifier
+                for (Pair<ColumnReference, ColumnReference> colRefPair : colRefPairs) {
+                    maxEncodedColumnQualifier = Math.max(maxEncodedColumnQualifier, encodingScheme.decode(colRefPair.getFirst().getQualifier()));
+                }
+                Expression[] colValues = EncodedColumnsUtil.createColumnExpressionArray(maxEncodedColumnQualifier);
+                // set the values of the columns
+                for (Pair<ColumnReference, ColumnReference> colRefPair : colRefPairs) {
+                    ColumnReference indexColRef = colRefPair.getFirst();
+                    ColumnReference dataColRef = colRefPair.getSecond();
+                    Expression expression = new SingleCellColumnExpression(new PDatum() {
+                        @Override
+                        public boolean isNullable() {
+                            return false;
+                        }
+                        
+                        @Override
+                        public SortOrder getSortOrder() {
+                            return null;
+                        }
+                        
+                        @Override
+                        public Integer getScale() {
+                            return null;
+                        }
+                        
+                        @Override
+                        public Integer getMaxLength() {
+                            return null;
+                        }
+                        
+                        @Override
+                        public PDataType getDataType() {
+                            return null;
+                        }
+                    }, dataColRef.getFamily(), dataColRef.getQualifier(), encodingScheme);
+                    ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+                    expression.evaluate(new ValueGetterTuple(valueGetter), ptr);
+                    byte[] value = ptr.copyBytesIfNecessary();
+                    if (value != null) {
+                        int indexArrayPos = encodingScheme.decode(indexColRef.getQualifier())-QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE+1;
+                        colValues[indexArrayPos] = new LiteralExpression(value);
+                    }
+                }
+                
+                List<Expression> children = Arrays.asList(colValues);
+                // we use SingleCellConstructorExpression to serialize multiple columns into a single byte[]
+                SingleCellConstructorExpression singleCellConstructorExpression = new SingleCellConstructorExpression(immutableStorageScheme, children);
+                ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                singleCellConstructorExpression.evaluate(new BaseTuple() {}, ptr);
                 if (put == null) {
                     put = new Put(indexRowKey);
                     put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
                 }
+                ImmutableBytesPtr colFamilyPtr = new ImmutableBytesPtr(columnFamily);
                 //this is a little bit of extra work for installations that are running <0.94.14, but that should be rare and is a short-term set of wrappers - it shouldn't kill GC
-                if(this.isLocalIndex) {
-                    ImmutableBytesWritable localIndexColFamily = this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable());
-                    put.add(kvBuilder.buildPut(rowKey, localIndexColFamily, cq, ts, value));
-                } else {
-                    put.add(kvBuilder.buildPut(rowKey, ref.getFamilyWritable(), cq, ts, value));
+                put.add(kvBuilder.buildPut(rowKey, colFamilyPtr, QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES_PTR, ts, ptr));
+            }
+        } else {
+            for (ColumnReference ref : this.getCoveredColumns()) {
+                ColumnReference indexColRef = this.coveredColumnsMap.get(ref);
+                ImmutableBytesPtr cq = indexColRef.getQualifierWritable();
+                ImmutableBytesPtr cf = indexColRef.getFamilyWritable();
+                ImmutableBytesWritable value = valueGetter.getLatestValue(ref);
+                if (value != null) {
+                    if (put == null) {
+                        put = new Put(indexRowKey);
+                        put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
+                    }
+                    put.add(kvBuilder.buildPut(rowKey, cf, cq, ts, value));
                 }
             }
         }
@@ -962,7 +1125,6 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         return buildDeleteMutation(kvBuilder, null, dataRowKeyPtr, Collections.<KeyValue>emptyList(), ts, null, null);
     }
     
-    @SuppressWarnings("deprecation")
     public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<KeyValue> pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException {
         byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, regionStartKey, regionEndKey);
         // Delete the entire row if any of the indexed columns changed
@@ -972,15 +1134,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             Delete delete = new Delete(indexRowKey);
             
             for (ColumnReference ref : getCoveredColumns()) {
-                byte[] family = ref.getFamily();
-                if (this.isLocalIndex) {
-                    family = this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable()).get();
-                }
+                ColumnReference indexColumn = coveredColumnsMap.get(ref);
                 // If table delete was single version, then index delete should be as well
                 if (deleteType == DeleteType.SINGLE_VERSION) {
-                    delete.deleteFamilyVersion(family, ts);
+                    delete.deleteFamilyVersion(indexColumn.getFamily(), ts);
                 } else {
-                    delete.deleteFamily(family, ts);
+                    delete.deleteFamily(indexColumn.getFamily(), ts);
                 }
             }
             if (deleteType == DeleteType.SINGLE_VERSION) {
@@ -992,34 +1151,35 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             return delete;
         }
         Delete delete = null;
+        Set<ColumnReference> dataTableColRefs = coveredColumnsMap.keySet();
         // Delete columns for missing key values
         for (Cell kv : pendingUpdates) {
             if (kv.getTypeByte() != KeyValue.Type.Put.getCode()) {
                 ColumnReference ref = new ColumnReference(kv.getFamily(), kv.getQualifier());
-                if (coveredColumns.contains(ref)) {
+                if (dataTableColRefs.contains(ref)) {
                     if (delete == null) {
                         delete = new Delete(indexRowKey);                    
                         delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
                     }
-                    byte[] family = this.isLocalIndex ? this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable()).get() : ref.getFamily();
+                    ColumnReference indexColumn = coveredColumnsMap.get(ref);
                     // If point delete for data table, then use point delete for index as well
-                    if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) {
-                        delete.deleteColumn(family, IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
+                    if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) { 
+                        delete.deleteColumn(indexColumn.getFamily(), indexColumn.getQualifier(), ts);
                     } else {
-                        delete.deleteColumns(family, IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
+                        delete.deleteColumns(indexColumn.getFamily(), indexColumn.getQualifier(), ts);
                     }
                 }
             }
         }
         return delete;
     }
-
+    
     public byte[] getIndexTableName() {
         return indexTableName;
     }
     
     public Set<ColumnReference> getCoveredColumns() {
-        return coveredColumns;
+        return coveredColumnsMap.keySet();
     }
 
     public Set<ColumnReference> getAllColumns() {
@@ -1032,7 +1192,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         // If if there are no covered columns, we know it's our default name
         return emptyKeyValueCFPtr;
     }
-
+    
+    @Deprecated // Only called by code older than our 4.10 release
     @Override
     public void readFields(DataInput input) throws IOException {
         int encodedIndexSaltBucketsAndMultiTenant = WritableUtils.readVInt(input);
@@ -1060,16 +1221,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         int encodedCoveredolumnsAndLocalIndex = WritableUtils.readVInt(input);
         isLocalIndex = encodedCoveredolumnsAndLocalIndex < 0;
         int nCoveredColumns = Math.abs(encodedCoveredolumnsAndLocalIndex) - 1;
-        coveredColumns = Sets.newLinkedHashSetWithExpectedSize(nCoveredColumns);
-        dataTableLocalIndexFamilyMap = Maps.newHashMapWithExpectedSize(nCoveredColumns);
+        coveredColumnsMap = Maps.newHashMapWithExpectedSize(nCoveredColumns);
         for (int i = 0; i < nCoveredColumns; i++) {
-            byte[] cf = Bytes.readByteArray(input);
-            byte[] cq = Bytes.readByteArray(input);
-            ColumnReference ref = new ColumnReference(cf,cq);
-            coveredColumns.add(ref);
-            if(isLocalIndex) {
-                dataTableLocalIndexFamilyMap.put(ref.getFamilyWritable(), new ImmutableBytesWritable(Bytes.toBytes(IndexUtil.getLocalIndexColumnFamily(Bytes.toString(cf)))));
-            }
+            byte[] dataTableCf = Bytes.readByteArray(input);
+            byte[] dataTableCq = Bytes.readByteArray(input);
+            ColumnReference dataTableRef = new ColumnReference(dataTableCf, dataTableCq);
+            byte[] indexTableCf = isLocalIndex ? IndexUtil.getLocalIndexColumnFamily(dataTableCf) : dataTableCf;
+            byte[] indexTableCq  = IndexUtil.getIndexColumnName(dataTableCf, dataTableCq);
+            ColumnReference indexTableRef = new ColumnReference(indexTableCf, indexTableCq);
+            coveredColumnsMap.put(dataTableRef, indexTableRef);
         }
         // Hack to serialize whether the index row key is optimizable
         int len = WritableUtils.readVInt(input);
@@ -1097,9 +1257,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             int numIndexedExpressions = WritableUtils.readVInt(input);
             indexedExpressions = Lists.newArrayListWithExpectedSize(numIndexedExpressions);        
             for (int i = 0; i < numIndexedExpressions; i++) {
-            	Expression expression = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
-            	expression.readFields(input);
-            	indexedExpressions.add(expression);
+                Expression expression = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
+                expression.readFields(input);
+                indexedExpressions.add(expression);
             }
         }
         else {
@@ -1151,6 +1311,79 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         initCachedState();
     }
     
+        
+    public static IndexMaintainer fromProto(ServerCachingProtos.IndexMaintainer proto, RowKeySchema dataTableRowKeySchema, boolean isDataTableSalted) throws IOException {
+        IndexMaintainer maintainer = new IndexMaintainer(dataTableRowKeySchema, isDataTableSalted);
+        maintainer.nIndexSaltBuckets = proto.getSaltBuckets();
+        maintainer.isMultiTenant = proto.getIsMultiTenant();
+        maintainer.viewIndexId = proto.hasViewIndexId() ? proto.getViewIndexId().toByteArray() : null;
+        List<ServerCachingProtos.ColumnReference> indexedColumnsList = proto.getIndexedColumnsList();
+        maintainer.indexedColumns = new HashSet<ColumnReference>(indexedColumnsList.size());
+        for (ServerCachingProtos.ColumnReference colRefFromProto : indexedColumnsList) {
+            maintainer.indexedColumns.add(new ColumnReference(colRefFromProto.getFamily().toByteArray(), colRefFromProto.getQualifier().toByteArray()));
+        }
+        List<Integer> indexedColumnTypes = proto.getIndexedColumnTypeOrdinalList();
+        maintainer.indexedColumnTypes = new ArrayList<PDataType>(indexedColumnTypes.size());
+        for (Integer typeOrdinal : indexedColumnTypes) {
+            maintainer.indexedColumnTypes.add(PDataType.values()[typeOrdinal]);
+        }
+        maintainer.indexTableName = proto.getIndexTableName().toByteArray();
+        maintainer.rowKeyOrderOptimizable = proto.getRowKeyOrderOptimizable();
+        maintainer.dataEmptyKeyValueCF = proto.getDataTableEmptyKeyValueColFamily().toByteArray();
+        ServerCachingProtos.ImmutableBytesWritable emptyKeyValueColFamily = proto.getEmptyKeyValueColFamily();
+        maintainer.emptyKeyValueCFPtr = new ImmutableBytesPtr(emptyKeyValueColFamily.getByteArray().toByteArray(), emptyKeyValueColFamily.getOffset(), emptyKeyValueColFamily.getLength());
+        maintainer.indexedExpressions = new ArrayList<>();
+        try (ByteArrayInputStream stream = new ByteArrayInputStream(proto.getIndexedExpressions().toByteArray())) {
+            DataInput input = new DataInputStream(stream);
+            while (stream.available() > 0) {
+                int expressionOrdinal = WritableUtils.readVInt(input);
+                Expression expression = ExpressionType.values()[expressionOrdinal].newInstance();
+                expression.readFields(input);
+                maintainer.indexedExpressions.add(expression);
+            }
+        }
+        maintainer.rowKeyMetaData = newRowKeyMetaData(maintainer, dataTableRowKeySchema, maintainer.indexedExpressions.size(), isDataTableSalted, maintainer.isMultiTenant);
+        try (ByteArrayInputStream stream = new ByteArrayInputStream(proto.getRowKeyMetadata().toByteArray())) {
+            DataInput input = new DataInputStream(stream);
+            maintainer.rowKeyMetaData.readFields(input);   
+        }
+        maintainer.nDataCFs = proto.getNumDataTableColFamilies();
+        maintainer.indexWALDisabled = proto.getIndexWalDisabled();
+        maintainer.estimatedIndexRowKeyBytes = proto.getIndexRowKeyByteSize();
+        maintainer.immutableRows = proto.getImmutable();
+        List<ColumnInfo> indexedColumnInfoList = proto.getIndexedColumnInfoList();
+        maintainer.indexedColumnsInfo = Sets.newHashSet();
+        for (ColumnInfo info : indexedColumnInfoList) {
+            maintainer.indexedColumnsInfo.add(new Pair<>(info.getFamilyName(), info.getColumnName()));
+        }
+        // proto doesn't support single byte so need an explicit cast here
+        maintainer.encodingScheme = PTable.QualifierEncodingScheme.fromSerializedValue((byte)proto.getEncodingScheme());
+        maintainer.immutableStorageScheme = PTable.ImmutableStorageScheme.fromSerializedValue((byte)proto.getImmutableStorageScheme());
+        maintainer.isLocalIndex = proto.getIsLocalIndex();
+        
+        List<ServerCachingProtos.ColumnReference> dataTableColRefsForCoveredColumnsList = proto.getDataTableColRefForCoveredColumnsList();
+        List<ServerCachingProtos.ColumnReference> indexTableColRefsForCoveredColumnsList = proto.getIndexTableColRefForCoveredColumnsList();
+        maintainer.coveredColumnsMap = Maps.newHashMapWithExpectedSize(dataTableColRefsForCoveredColumnsList.size());
+        boolean encodedColumnNames = maintainer.encodingScheme != NON_ENCODED_QUALIFIERS;
+        Iterator<ServerCachingProtos.ColumnReference> indexTableColRefItr = indexTableColRefsForCoveredColumnsList.iterator();
+        for (ServerCachingProtos.ColumnReference colRefFromProto : dataTableColRefsForCoveredColumnsList) {
+            ColumnReference dataTableColRef = new ColumnReference(colRefFromProto.getFamily().toByteArray(), colRefFromProto.getQualifier( ).toByteArray());
+            ColumnReference indexTableColRef;
+            if (encodedColumnNames) {
+                ServerCachingProtos.ColumnReference fromProto = indexTableColRefItr.next(); 
+                indexTableColRef = new ColumnReference(fromProto.getFamily().toByteArray(), fromProto.getQualifier( ).toByteArray());
+            } else {
+                byte[] cq = IndexUtil.getIndexColumnName(dataTableColRef.getFamily(), dataTableColRef.getQualifier());
+                byte[] cf = maintainer.isLocalIndex ? IndexUtil.getLocalIndexColumnFamily(dataTableColRef.getFamily()) : dataTableColRef.getFamily();
+                indexTableColRef = new ColumnReference(cf, cq);
+            }
+            maintainer.coveredColumnsMap.put(dataTableColRef, indexTableColRef);
+        }
+        maintainer.initCachedState();
+        return maintainer;
+    }
+    
+    @Deprecated // Only called by code older than our 4.10 release
     @Override
     public void write(DataOutput output) throws IOException {
         // Encode nIndexSaltBuckets and isMultiTenant together
@@ -1170,8 +1403,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             WritableUtils.writeVInt(output, type.ordinal());
         }
         // Encode coveredColumns.size() and whether or not this is a local index
-        WritableUtils.writeVInt(output, (coveredColumns.size() + 1) * (isLocalIndex ? -1 : 1));
-        for (ColumnReference ref : coveredColumns) {
+        WritableUtils.writeVInt(output, (coveredColumnsMap.size() + 1) * (isLocalIndex ? -1 : 1));
+        for (ColumnReference ref : coveredColumnsMap.keySet()) {
             Bytes.writeByteArray(output, ref.getFamily());
             Bytes.writeByteArray(output, ref.getQualifier());
         }
@@ -1186,8 +1419,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         
         WritableUtils.writeVInt(output, indexedExpressions.size());
         for (Expression expression : indexedExpressions) {
-        	WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal());
-        	expression.write(output);
+            WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal());
+            expression.write(output);
         }
         
         rowKeyMetaData.write(output);
@@ -1196,6 +1429,76 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         // Encode estimatedIndexRowKeyBytes and immutableRows together.
         WritableUtils.writeVInt(output, estimatedIndexRowKeyBytes * (immutableRows ? -1 : 1));
     }
+    
+    public static ServerCachingProtos.IndexMaintainer toProto(IndexMaintainer maintainer) throws IOException {
+        ServerCachingProtos.IndexMaintainer.Builder builder = ServerCachingProtos.IndexMaintainer.newBuilder();
+        builder.setSaltBuckets(maintainer.nIndexSaltBuckets);
+        builder.setIsMultiTenant(maintainer.isMultiTenant);
+        if (maintainer.viewIndexId != null) {
+            builder.setViewIndexId(ByteStringer.wrap(maintainer.viewIndexId));
+        }
+        for (ColumnReference colRef : maintainer.indexedColumns) {
+            ServerCachingProtos.ColumnReference.Builder cRefBuilder =  ServerCachingProtos.ColumnReference.newBuilder();
+            cRefBuilder.setFamily(ByteStringer.wrap(colRef.getFamily()));
+            cRefBuilder.setQualifier(ByteStringer.wrap(colRef.getQualifier()));
+            builder.addIndexedColumns(cRefBuilder.build());
+        }
+        for (PDataType dataType : maintainer.indexedColumnTypes) {
+            builder.addIndexedColumnTypeOrdinal(dataType.ordinal());
+        }
+        for (Entry<ColumnReference, ColumnReference> e : maintainer.coveredColumnsMap.entrySet()) {
+            ServerCachingProtos.ColumnReference.Builder cRefBuilder =  ServerCachingProtos.ColumnReference.newBuilder();
+            ColumnReference dataTableColRef = e.getKey();
+            cRefBuilder.setFamily(ByteStringer.wrap(dataTableColRef.getFamily()));
+            cRefBuilder.setQualifier(ByteStringer.wrap(dataTableColRef.getQualifier()));
+            builder.addDataTableColRefForCoveredColumns(cRefBuilder.build());
+            if (maintainer.encodingScheme != NON_ENCODED_QUALIFIERS) {
+                // We need to serialize the colRefs of index tables only in case of encoded column names.
+                ColumnReference indexTableColRef = e.getValue();
+                cRefBuilder =  ServerCachingProtos.ColumnReference.newBuilder();
+                cRefBuilder.setFamily(ByteStringer.wrap(indexTableColRef.getFamily()));
+                cRefBuilder.setQualifier(ByteStringer.wrap(indexTableColRef.getQualifier()));
+                builder.addIndexTableColRefForCoveredColumns(cRefBuilder.build());
+            }
+        }
+        builder.setIsLocalIndex(maintainer.isLocalIndex);
+        builder.setIndexTableName(ByteStringer.wrap(maintainer.indexTableName));
+        builder.setRowKeyOrderOptimizable(maintainer.rowKeyOrderOptimizable);
+        builder.setDataTableEmptyKeyValueColFamily(ByteStringer.wrap(maintainer.dataEmptyKeyValueCF));
+        ServerCachingProtos.ImmutableBytesWritable.Builder ibwBuilder = ServerCachingProtos.ImmutableBytesWritable.newBuilder();
+        ibwBuilder.setByteArray(ByteStringer.wrap(maintainer.emptyKeyValueCFPtr.get()));
+        ibwBuilder.setLength(maintainer.emptyKeyValueCFPtr.getLength());
+        ibwBuilder.setOffset(maintainer.emptyKeyValueCFPtr.getOffset());
+        builder.setEmptyKeyValueColFamily(ibwBuilder.build());
+        try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
+            DataOutput output = new DataOutputStream(stream);
+            for (Expression expression : maintainer.indexedExpressions) {
+                WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal());
+                expression.write(output);
+            }
+            builder.setIndexedExpressions(ByteStringer.wrap(stream.toByteArray()));
+        }
+        try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
+            DataOutput output = new DataOutputStream(stream);
+            maintainer.rowKeyMetaData.write(output);
+            builder.setRowKeyMetadata(ByteStringer.wrap(stream.toByteArray()));
+        }
+        builder.setNumDataTableColFamilies(maintainer.nDataCFs);
+        builder.setIndexWalDisabled(maintainer.indexWALDisabled);
+        builder.setIndexRowKeyByteSize(maintainer.estimatedIndexRowKeyBytes);
+        builder.setImmutable(maintainer.immutableRows);
+        for (Pair<String, String> p : maintainer.indexedColumnsInfo) {
+            ServerCachingProtos.ColumnInfo.Builder ciBuilder = ServerCachingProtos.ColumnInfo.newBuilder();
+            if (p.getFirst() != null) {
+                ciBuilder.setFamilyName(p.getFirst());
+            }
+            ciBuilder.setColumnName(p.getSecond());
+            builder.addIndexedColumnInfo(ciBuilder.build());
+        }
+        builder.setEncodingScheme(maintainer.encodingScheme.getSerializedMetadataValue());
+        builder.setImmutableStorageScheme(maintainer.immutableStorageScheme.getSerializedMetadataValue());
+        return builder.build();
+    }
 
     public int getEstimatedByteSize() {
         int size = WritableUtils.getVIntSize(nIndexSaltBuckets);
@@ -1212,8 +1515,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             PDataType type = indexedColumnTypes.get(i);
             size += WritableUtils.getVIntSize(type.ordinal());
         }
-        size += WritableUtils.getVIntSize(coveredColumns.size());
-        for (ColumnReference ref : coveredColumns) {
+        Set<ColumnReference> dataTableColRefs = coveredColumnsMap.keySet();
+        size += WritableUtils.getVIntSize(dataTableColRefs.size());
+        for (ColumnReference ref : dataTableColRefs) {
             size += WritableUtils.getVIntSize(ref.getFamilyWritable().getSize());
             size += ref.getFamily().length;
             size += WritableUtils.getVIntSize(ref.getQualifierWritable().getSize());
@@ -1241,24 +1545,16 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
      * Init calculated state reading/creating
      */
     private void initCachedState() {
-        dataEmptyKeyValueRef =
-                new ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(),
-                        QueryConstants.EMPTY_COLUMN_BYTES);
-
-        indexQualifiers = Lists.newArrayListWithExpectedSize(this.coveredColumns.size());
-        for (ColumnReference ref : coveredColumns) {
-            indexQualifiers.add(new ImmutableBytesPtr(IndexUtil.getIndexColumnName(
-                ref.getFamily(), ref.getQualifier())));
-        }
-
-        this.allColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size() + coveredColumns.size());
+        byte[] emptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(encodingScheme).getFirst();
+        dataEmptyKeyValueRef = new ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(), emptyKvQualifier);
+        this.allColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size() + coveredColumnsMap.size());
         // columns that are required to evaluate all expressions in indexedExpressions (not including columns in data row key)
         this.indexedColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size());
         for (Expression expression : indexedExpressions) {
         	KeyValueExpressionVisitor visitor = new KeyValueExpressionVisitor() {
                 @Override
                 public Void visit(KeyValueColumnExpression expression) {
-                	if (indexedColumns.add(new ColumnReference(expression.getColumnFamily(), expression.getColumnName()))) {
+                	if (indexedColumns.add(new ColumnReference(expression.getColumnFamily(), expression.getColumnQualifier()))) {
                 		indexedColumnTypes.add(expression.getDataType());
                 	}
                     return null;
@@ -1267,7 +1563,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             expression.accept(visitor);
         }
         allColumns.addAll(indexedColumns);
-        allColumns.addAll(coveredColumns);
+        allColumns.addAll(coveredColumnsMap.keySet());
         
         int dataPkOffset = (isDataTableSalted ? 1 : 0) + (isMultiTenant ? 1 : 0);
         int nIndexPkColumns = getIndexPkColumnCount();
@@ -1311,12 +1607,21 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
     }
 
     private int getIndexPkColumnCount() {
-        return dataRowKeySchema.getFieldCount() + indexedExpressions.size() - (isDataTableSalted ? 1 : 0) - (isMultiTenant ? 1 : 0);
+        return getIndexPkColumnCount(dataRowKeySchema, indexedExpressions.size(), isDataTableSalted, isMultiTenant);
+    }
+    
+    private static int getIndexPkColumnCount(RowKeySchema rowKeySchema, int numIndexExpressions, boolean isDataTableSalted, boolean isMultiTenant) {
+        return rowKeySchema.getFieldCount() + numIndexExpressions - (isDataTableSalted ? 1 : 0) - (isMultiTenant ? 1 : 0);
     }
     
     private RowKeyMetaData newRowKeyMetaData() {
         return getIndexPkColumnCount() < 0xFF ? new ByteSizeRowKeyMetaData() : new IntSizedRowKeyMetaData();
     }
+    
+    private static RowKeyMetaData newRowKeyMetaData(IndexMaintainer i, RowKeySchema rowKeySchema, int numIndexExpressions, boolean isDataTableSalted, boolean isMultiTenant) {
+        int indexPkColumnCount = getIndexPkColumnCount(rowKeySchema, numIndexExpressions, isDataTableSalted, isMultiTenant);
+        return indexPkColumnCount < 0xFF ? i.new ByteSizeRowKeyMetaData() : i.new IntSizedRowKeyMetaData();
+    }
 
     private RowKeyMetaData newRowKeyMetaData(int capacity) {
         return capacity < 0xFF ? new ByteSizeRowKeyMetaData(capacity) : new IntSizedRowKeyMetaData(capacity);
@@ -1523,4 +1828,17 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             return udfParseNodes;
         }
     }
+    
+    public byte[] getEmptyKeyValueQualifier() {
+        return dataEmptyKeyValueRef.getQualifier();
+    }
+    
+    public Set<Pair<String, String>> getIndexedColumnInfo() {
+        return indexedColumnsInfo;
+    }
+    
+    public ImmutableStorageScheme getIndexStorageScheme() {
+        return immutableStorageScheme;
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
index 05a01b9..fcabdfd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
@@ -93,4 +93,5 @@ public class IndexMetaDataCacheClient {
          */
         return serverCache.addServerCache(ranges, ptr, txState, new IndexMetaDataCacheFactory(), cacheUsingTableRef);
     }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
index 56849fe..9edcafc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
@@ -47,10 +47,10 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory {
     }
 
     @Override
-    public Closeable newCache (ImmutableBytesWritable cachePtr, byte[] txState, final MemoryChunk chunk) throws SQLException {
+    public Closeable newCache (ImmutableBytesWritable cachePtr, byte[] txState, final MemoryChunk chunk, boolean useProtoForIndexMaintainer) throws SQLException {
         // just use the standard keyvalue builder - this doesn't really need to be fast
         final List<IndexMaintainer> maintainers = 
-                IndexMaintainer.deserialize(cachePtr, GenericKeyValueBuilder.INSTANCE);
+                IndexMaintainer.deserialize(cachePtr, GenericKeyValueBuilder.INSTANCE, useProtoForIndexMaintainer);
         final Transaction txn;
         try {
             txn = txState.length!=0 ? MutationState.decodeTransaction(txState) : null;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index bf1d0fb..05211c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -166,7 +166,7 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder {
             ExpressionVisitor<Void> visitor = new StatelessTraverseAllExpressionVisitor<Void>() {
                 @Override
                 public Void visit(KeyValueColumnExpression expression) {
-                    get.addColumn(expression.getColumnFamily(), expression.getColumnName());
+                    get.addColumn(expression.getColumnFamily(), expression.getColumnQualifier());
                     estimatedSizeHolder[0]++;
                     return null;
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 9d2955b..4116101 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -38,6 +38,7 @@ import com.google.common.collect.Lists;
  */
 public class PhoenixIndexCodec extends BaseIndexCodec {
     public static final String INDEX_MD = "IdxMD";
+    public static final String INDEX_PROTO_MD = "IdxProtoMD";
     public static final String INDEX_UUID = "IdxUUID";
     public static final String INDEX_MAINTAINERS = "IndexMaintainers";
     private static KeyValueBuilder KV_BUILDER = GenericKeyValueBuilder.INSTANCE;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index e515dbb..5da8be8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -226,4 +226,4 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
         }
         return indexTableNames;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
index d22e957..39473dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
@@ -47,10 +47,15 @@ public class PhoenixIndexMetaData implements IndexMetaData {
         if (attributes == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
         byte[] uuid = attributes.get(PhoenixIndexCodec.INDEX_UUID);
         if (uuid == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
-        byte[] md = attributes.get(PhoenixIndexCodec.INDEX_MD);
+        boolean useProto = false;
+        byte[] md = attributes.get(PhoenixIndexCodec.INDEX_PROTO_MD);
+        useProto = md != null;
+        if (md == null) {
+            md = attributes.get(PhoenixIndexCodec.INDEX_MD);
+        }
         byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE);
         if (md != null) {
-            final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md);
+            final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md, useProto);
             final Transaction txn = MutationState.decodeTransaction(txState);
             return new IndexMetaDataCache() {
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index c67da6e..9ee5ea7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -67,7 +67,6 @@ import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
@@ -304,8 +303,16 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
                 for (ColumnReference ref : mutableColumns) {
                     scan.addColumn(ref.getFamily(), ref.getQualifier());
                 }
+                /*
+                 * Indexes inherit the storage scheme of the data table which means all the indexes have the same
+                 * storage scheme and empty key value qualifier. Note that this assumption would be broken if we start
+                 * supporting new indexes over existing data tables to have a different storage scheme than the data
+                 * table.
+                 */
+                byte[] emptyKeyValueQualifier = indexMaintainers.get(0).getEmptyKeyValueQualifier();
+                
                 // Project empty key value column
-                scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES);
+                scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), emptyKeyValueQualifier);
                 ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1);
                 scanRanges.initializeScan(scan);
                 TableName tableName = env.getRegion().getRegionInfo().getTable();
@@ -356,7 +363,8 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
             Map<ImmutableBytesPtr, MultiMutation> mutationsToFindPreviousValue) throws IOException {
         if (scanner != null) {
             Result result;
-            ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES);
+            ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0)
+                    .getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier());
             // Process existing data table rows by removing the old index row and adding the new index row
             while ((result = scanner.next()) != null) {
                 Mutation m = mutationsToFindPreviousValue.remove(new ImmutableBytesPtr(result.getRow()));
@@ -384,7 +392,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
             // to generate point delete markers for all index rows that were added. We don't have Tephra
             // manage index rows in change sets because we don't want to be hit with the additional
             // memory hit and do not need to do conflict detection on index rows.
-            ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES);
+            ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier());
             while ((result = scanner.next()) != null) {
                 Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow()));
                 // Sort by timestamp, type, cf, cq so we can process in time batches from oldest to newest

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index c0f9707..780d70f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -17,11 +17,14 @@
  */
 package org.apache.phoenix.iterate;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER;
+import static org.apache.phoenix.schema.PTable.IndexType.LOCAL;
+import static org.apache.phoenix.schema.PTableType.INDEX;
 import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
 
 import java.io.ByteArrayInputStream;
@@ -78,12 +81,12 @@ import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.TableRef;
@@ -92,6 +95,8 @@ import org.apache.phoenix.schema.stats.GuidePostsKey;
 import org.apache.phoenix.schema.stats.StatisticsUtil;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.PrefixByteCodec;
 import org.apache.phoenix.util.PrefixByteDecoder;
@@ -159,7 +164,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         return true;
     }
     
-    private static void initializeScan(QueryPlan plan, Integer perScanLimit, Integer offset, Scan scan) {
+    private static void initializeScan(QueryPlan plan, Integer perScanLimit, Integer offset, Scan scan) throws SQLException {
         StatementContext context = plan.getContext();
         TableRef tableRef = plan.getTableRef();
         PTable table = tableRef.getTable();
@@ -210,7 +215,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                             // Project empty key value unless the column family containing it has
                             // been projected in its entirety.
                             if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) {
-                                scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES);
+                                scan.addColumn(ecf, EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst());
                             }
                         }
                     }
@@ -228,7 +233,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
             if(offset!=null){
                 ScanUtil.addOffsetAttribute(scan, offset);
             }
-
             int cols = plan.getGroupBy().getOrderPreservingColumnCount();
             if (cols > 0 && keyOnlyFilter &&
                 !plan.getStatement().getHint().hasHint(HintNode.Hint.RANGE_SCAN) &&
@@ -243,13 +247,90 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                     ScanUtil.andFilterAtEnd(scan, new PageFilter(plan.getLimit()));
                 }
             }
-
+            scan.setAttribute(BaseScannerRegionObserver.QUALIFIER_ENCODING_SCHEME, new byte[]{table.getEncodingScheme().getSerializedMetadataValue()});
+            scan.setAttribute(BaseScannerRegionObserver.IMMUTABLE_STORAGE_ENCODING_SCHEME, new byte[]{table.getImmutableStorageScheme().getSerializedMetadataValue()});
+            // When analyzing the table, there is no look up for key values being done.
+            // So there is no point setting the range.
+            if (EncodedColumnsUtil.setQualifierRanges(table) && !ScanUtil.isAnalyzeTable(scan)) {
+                Pair<Integer, Integer> range = getEncodedQualifierRange(scan, context);
+                if (range != null) {
+                    scan.setAttribute(BaseScannerRegionObserver.MIN_QUALIFIER, Bytes.toBytes(range.getFirst()));
+                    scan.setAttribute(BaseScannerRegionObserver.MAX_QUALIFIER, Bytes.toBytes(range.getSecond()));
+                }
+            }
             if (optimizeProjection) {
                 optimizeProjection(context, scan, table, statement);
             }
         }
     }
+    
+    private static Pair<Integer, Integer> getEncodedQualifierRange(Scan scan, StatementContext context)
+            throws SQLException {
+        PTable table = context.getCurrentTable().getTable();
+        QualifierEncodingScheme encodingScheme = table.getEncodingScheme();
+        checkArgument(encodingScheme != QualifierEncodingScheme.NON_ENCODED_QUALIFIERS,
+            "Method should only be used for tables using encoded column names");
+        Pair<Integer, Integer> minMaxQualifiers = new Pair<>();
+        for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) {
+            byte[] cq = whereCol.getSecond();
+            if (cq != null) {
+                int qualifier = table.getEncodingScheme().decode(cq);
+                determineQualifierRange(qualifier, minMaxQualifiers);
+            }
+        }
+        Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
 
+        Map<String, Pair<Integer, Integer>> qualifierRanges = EncodedColumnsUtil.getFamilyQualifierRanges(table);
+        for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
+            if (entry.getValue() != null) {
+                for (byte[] cq : entry.getValue()) {
+                    if (cq != null) {
+                        int qualifier = table.getEncodingScheme().decode(cq);
+                        determineQualifierRange(qualifier, minMaxQualifiers);
+                    }
+                }
+            } else {
+                /*
+                 * All the columns of the column family are being projected. So we will need to
+                 * consider all the columns in the column family to determine the min-max range.
+                 */
+                String family = Bytes.toString(entry.getKey());
+                if (table.getType() == INDEX && table.getIndexType() == LOCAL && !IndexUtil.isLocalIndexFamily(family)) {
+                    //TODO: samarth confirm with James why do we need this hack here :(
+                    family = IndexUtil.getLocalIndexColumnFamily(family);
+                }
+                Pair<Integer, Integer> range = qualifierRanges.get(family);
+                if (range != null) {
+                    determineQualifierRange(range.getFirst(), minMaxQualifiers);
+                    determineQualifierRange(range.getSecond(), minMaxQualifiers);
+                }
+            }
+        }
+        if (minMaxQualifiers.getFirst() == null) {
+            return null;
+        }
+        return minMaxQualifiers;
+    }
+
+    /**
+     * 
+     * @param cq
+     * @param minMaxQualifiers
+     * @return true if the empty column was projected
+     */
+    private static void determineQualifierRange(Integer qualifier, Pair<Integer, Integer> minMaxQualifiers) {
+        if (minMaxQualifiers.getFirst() == null) {
+            minMaxQualifiers.setFirst(qualifier);
+            minMaxQualifiers.setSecond(qualifier);
+        } else {
+            if (minMaxQualifiers.getFirst() > qualifier) {
+                minMaxQualifiers.setFirst(qualifier);
+            } else if (minMaxQualifiers.getSecond() < qualifier) {
+                minMaxQualifiers.setSecond(qualifier);
+            }
+        }
+    }
+    
     private static void optimizeProjection(StatementContext context, Scan scan, PTable table, FilterableStatement statement) {
         Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
         // columnsTracker contain cf -> qualifiers which should get returned.
@@ -346,7 +427,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
             // the ExplicitColumnTracker not to be used, though.
             if (!statement.isAggregate() && filteredColumnNotInProjection) {
                 ScanUtil.andFilterAtEnd(scan, new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table),
-                        columnsTracker, conditionOnlyCfs));
+                        columnsTracker, conditionOnlyCfs, EncodedColumnsUtil.usesEncodedColumnNames(table.getEncodingScheme())));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
index 3293f65..1e5f09e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
@@ -49,7 +49,7 @@ abstract public class LookAheadResultIterator implements PeekingResultIterator {
         };
     }
     
-    private final static Tuple UNINITIALIZED = new ResultTuple();
+    private final static Tuple UNINITIALIZED = ResultTuple.EMPTY_TUPLE;
     private Tuple next = UNINITIALIZED;
     
     abstract protected Tuple advance() throws SQLException;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
index 8ada952..135ab26 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
@@ -180,6 +180,7 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
             return this.index;
         }
         
+        @Override
         public int size() {
             if (flushBuffer)
                 return flushedCount;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
index 8dcb2e8..e4c52c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.OrderByExpression;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.SizedUtil;
 
 import com.google.common.base.Function;
@@ -264,7 +265,7 @@ public class OrderedResultIterator implements PeekingResultIterator {
             }
             this.byteSize = queueEntries.getByteSize();
         } catch (IOException e) {
-            throw new SQLException("", e);
+            ServerUtil.createIOException(e.getMessage(), e);
         } finally {
             delegate.close();
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
index 88e141a..4b89133 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
@@ -24,16 +24,27 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
+import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.ServerUtil;
 
 
 public class RegionScannerResultIterator extends BaseResultIterator {
     private final RegionScanner scanner;
+    private final Pair<Integer, Integer> minMaxQualifiers;
+    private final boolean useQualifierAsIndex;
+    private final QualifierEncodingScheme encodingScheme;
     
-    public RegionScannerResultIterator(RegionScanner scanner) {
+    public RegionScannerResultIterator(RegionScanner scanner, Pair<Integer, Integer> minMaxQualifiers, QualifierEncodingScheme encodingScheme) {
         this.scanner = scanner;
+        this.useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers);
+        this.minMaxQualifiers = minMaxQualifiers;
+        this.encodingScheme = encodingScheme;
     }
     
     @Override
@@ -43,7 +54,7 @@ public class RegionScannerResultIterator extends BaseResultIterator {
         synchronized (scanner) {
             try {
                 // TODO: size
-                List<Cell> results = new ArrayList<Cell>();
+                List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) :  new ArrayList<Cell>();
                 // Results are potentially returned even when the return value of s.next is false
                 // since this is an indication of whether or not there are more values after the
                 // ones returned
@@ -53,7 +64,7 @@ public class RegionScannerResultIterator extends BaseResultIterator {
                 }
                 // We instantiate a new tuple because in all cases currently we hang on to it
                 // (i.e. to compute and hold onto the TopN).
-                MultiKeyValueTuple tuple = new MultiKeyValueTuple();
+                Tuple tuple = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
                 tuple.setKeyValues(results);
                 return tuple;
             } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index ebb2421..0bf7a71 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -205,7 +205,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
     public static final byte[] BASE_COLUMN_COUNT_BYTES = Bytes.toBytes(BASE_COLUMN_COUNT);
     public static final String IS_ROW_TIMESTAMP = "IS_ROW_TIMESTAMP";
     public static final byte[] IS_ROW_TIMESTAMP_BYTES = Bytes.toBytes(IS_ROW_TIMESTAMP);
-
+    
     public static final String TABLE_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY;
     public static final byte[] TABLE_FAMILY_BYTES = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
 
@@ -322,6 +322,15 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
     /** Version below which we fall back on the generic KeyValueBuilder */
     public static final int CLIENT_KEY_VALUE_BUILDER_THRESHOLD = VersionUtil.encodeVersion("0", "94", "14");
     
+    public static final String IMMUTABLE_STORAGE_SCHEME = "IMMUTABLE_STORAGE_SCHEME";
+    public static final byte[] STORAGE_SCHEME_BYTES = Bytes.toBytes(IMMUTABLE_STORAGE_SCHEME);
+    public static final String ENCODING_SCHEME = "ENCODING_SCHEME";
+    public static final byte[] ENCODING_SCHEME_BYTES = Bytes.toBytes(ENCODING_SCHEME);
+    public static final String COLUMN_QUALIFIER = "COLUMN_QUALIFIER";
+    public static final byte[] COLUMN_QUALIFIER_BYTES = Bytes.toBytes(COLUMN_QUALIFIER);
+    public static final String COLUMN_QUALIFIER_COUNTER = "QUALIFIER_COUNTER";
+    public static final byte[] COLUMN_QUALIFIER_COUNTER_BYTES = Bytes.toBytes(COLUMN_QUALIFIER_COUNTER);
+
     PhoenixDatabaseMetaData(PhoenixConnection connection) throws SQLException {
         this.emptyResultSet = new PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, new StatementContext(new PhoenixStatement(connection), false));
         this.connection = connection;
@@ -595,9 +604,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
                 newCells.addAll(cells);
                 newCells.add(kv);
                 Collections.sort(newCells, KeyValue.COMPARATOR);
-                resultTuple.setResult(Result.create(newCells));
+                tuple = new ResultTuple(Result.create(newCells));
             }
-
             return tuple;
         }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
index 47c17ae..3ca48a1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
@@ -107,7 +107,7 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable {
     private final static String STRING_FALSE = "0";
     private final static BigDecimal BIG_DECIMAL_FALSE = BigDecimal.valueOf(0);
     private final static Integer INTEGER_FALSE = Integer.valueOf(0);
-    private final static Tuple BEFORE_FIRST = new ResultTuple();
+    private final static Tuple BEFORE_FIRST = ResultTuple.EMPTY_TUPLE;
 
     private final ResultIterator scanner;
     private final RowProjector rowProjector;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b49fc0d1/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
index 908a117..921b412 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
@@ -69,7 +69,7 @@ public class HashCacheFactory implements ServerCacheFactory {
     }
 
     @Override
-    public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk) throws SQLException {
+    public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk, boolean useProtoForIndexMaintainer) throws SQLException {
         try {
             // This reads the uncompressed length from the front of the compressed input
             int uncompressedLen = Snappy.getUncompressedLength(cachePtr.get(), cachePtr.getOffset());