You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2017/02/27 06:14:44 UTC

[11/17] phoenix git commit: PHOENIX-1598 Encode column names to save space and improve performance

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
index 4d3c0cf..32e9f68 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 package org.apache.phoenix.compile;
+import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY;
 
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -24,7 +25,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.parse.AliasedNode;
 import org.apache.phoenix.parse.ColumnParseNode;
 import org.apache.phoenix.parse.FamilyWildcardParseNode;
@@ -43,11 +43,13 @@ import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.EncodedCQCounter;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.ProjectedColumn;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.SchemaUtil;
 
@@ -120,7 +122,7 @@ public class TupleProjectionCompiler {
             PColumn sourceColumn = table.getPKColumns().get(i);
             ColumnRef sourceColumnRef = new ColumnRef(tableRef, sourceColumn.getPosition());
             PColumn column = new ProjectedColumn(sourceColumn.getName(), sourceColumn.getFamilyName(), 
-                    position++, sourceColumn.isNullable(), sourceColumnRef);
+                    position++, sourceColumn.isNullable(), sourceColumnRef, null);
             projectedColumns.add(column);
         }
         for (PColumn sourceColumn : table.getColumns()) {
@@ -132,18 +134,18 @@ public class TupleProjectionCompiler {
                     && !families.contains(sourceColumn.getFamilyName().getString()))
                 continue;
             PColumn column = new ProjectedColumn(sourceColumn.getName(), sourceColumn.getFamilyName(), 
-                    position++, sourceColumn.isNullable(), sourceColumnRef);
+                    position++, sourceColumn.isNullable(), sourceColumnRef, sourceColumn.getColumnQualifierBytes());
             projectedColumns.add(column);
             // Wildcard or FamilyWildcard will be handled by ProjectionCompiler.
             if (!isWildcard && !families.contains(sourceColumn.getFamilyName())) {
-                context.getScan().addColumn(sourceColumn.getFamilyName().getBytes(), sourceColumn.getName().getBytes());
+            	EncodedColumnsUtil.setColumns(column, table, context.getScan());
             }
         }
         // add LocalIndexDataColumnRef
         for (LocalIndexDataColumnRef sourceColumnRef : visitor.localIndexColumnRefSet) {
             PColumn column = new ProjectedColumn(sourceColumnRef.getColumn().getName(), 
                     sourceColumnRef.getColumn().getFamilyName(), position++, 
-                    sourceColumnRef.getColumn().isNullable(), sourceColumnRef);
+                    sourceColumnRef.getColumn().isNullable(), sourceColumnRef, sourceColumnRef.getColumn().getColumnQualifierBytes());
             projectedColumns.add(column);
         }
         
@@ -154,9 +156,9 @@ public class TupleProjectionCompiler {
                 null, null, table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(),
                 table.getViewIndexId(),
                 table.getIndexType(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), 
-                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
+                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter());
     }
-
+    
     public static PTable createProjectedTable(TableRef tableRef, List<ColumnRef> sourceColumnRefs, boolean retainPKColumns) throws SQLException {
         PTable table = tableRef.getTable();
         boolean hasSaltingColumn = retainPKColumns && table.getBucketNum() != null;
@@ -169,20 +171,23 @@ public class TupleProjectionCompiler {
             String aliasedName = tableRef.getTableAlias() == null ? 
                       SchemaUtil.getColumnName(table.getName().getString(), colName) 
                     : SchemaUtil.getColumnName(tableRef.getTableAlias(), colName);
-
-            PColumn column = new ProjectedColumn(PNameFactory.newName(aliasedName), 
-                    retainPKColumns && SchemaUtil.isPKColumn(sourceColumn) ? 
-                            null : PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), 
-                    position++, sourceColumn.isNullable(), sourceColumnRef);
+            PName familyName =  SchemaUtil.isPKColumn(sourceColumn) ? (retainPKColumns ? null : PNameFactory.newName(VALUE_COLUMN_FAMILY)) : sourceColumn.getFamilyName();
+            PColumn column = new ProjectedColumn(PNameFactory.newName(aliasedName), familyName, 
+                    position++, sourceColumn.isNullable(), sourceColumnRef, sourceColumn.getColumnQualifierBytes());
             projectedColumns.add(column);
         }
+        EncodedCQCounter cqCounter = EncodedCQCounter.NULL_COUNTER;
+        if (EncodedColumnsUtil.usesEncodedColumnNames(table)) {
+            cqCounter = EncodedCQCounter.copy(table.getEncodedCQCounter());
+        }
+        
         return PTableImpl.makePTable(table.getTenantId(), PROJECTED_TABLE_SCHEMA, table.getName(), PTableType.PROJECTED,
                 null, table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(),
                 retainPKColumns ? table.getBucketNum() : null, projectedColumns, null, null,
                 Collections.<PTable> emptyList(), table.isImmutableRows(), Collections.<PName> emptyList(), null, null,
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(),
                 table.getViewIndexId(), null, table.rowKeyOrderOptimizable(), table.isTransactional(),
-                table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
+                table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), cqCounter);
     }
 
     // For extracting column references from single select statement

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
index bc3466c..e5e18e3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
@@ -39,6 +39,8 @@ import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.SchemaUtil;
 
@@ -84,11 +86,12 @@ public class UnionCompiler {
         for (int i = 0; i < plan.getProjector().getColumnCount(); i++) {
             ColumnProjector colProj = plan.getProjector().getColumnProjector(i);
             String name = selectNodes == null ? colProj.getName() : selectNodes.get(i).getAlias();
+            PName colName = PNameFactory.newName(name);
             PColumnImpl projectedColumn = new PColumnImpl(PNameFactory.newName(name),
                 UNION_FAMILY_NAME, targetTypes.get(i).getType(), targetTypes.get(i).getMaxLength(),
                 targetTypes.get(i).getScale(), colProj.getExpression().isNullable(), i,
                 targetTypes.get(i).getSortOrder(), 500, null, false,
-                colProj.getExpression().toString(), false, false);
+                colProj.getExpression().toString(), false, false, colName.getBytes());
             projectedColumns.add(projectedColumn);
         }
         Long scn = statement.getConnection().getSCN();
@@ -98,7 +101,7 @@ public class UnionCompiler {
             null, null, projectedColumns, null, null, null, true, null, null, null, true,
             true, true, null, null, null, false, false, 0, 0L,
             SchemaUtil.isNamespaceMappingEnabled(PTableType.SUBQUERY,
-                statement.getConnection().getQueryServices().getProps()), null, false);
+                statement.getConnection().getQueryServices().getProps()), null, false, ImmutableStorageScheme.ONE_CELL_PER_COLUMN, QualifierEncodingScheme.NON_ENCODED_QUALIFIERS, PTable.EncodedCQCounter.NULL_COUNTER);
         TableRef tableRef = new TableRef(null, tempTable, 0, false);
         return tableRef;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 18070d4..7a285a9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -748,7 +748,7 @@ public class UpsertCompiler {
                             if (ptr.getLength() > 0) {
                                 byte[] uuidValue = ServerCacheClient.generateId();
                                 scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                                scan.setAttribute(PhoenixIndexCodec.INDEX_MD, ptr.get());
+                                scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get());
                                 scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
                             }
                             ResultIterator iterator = aggPlan.iterator();
@@ -917,10 +917,10 @@ public class UpsertCompiler {
                 UpdateColumnCompiler compiler = new UpdateColumnCompiler(context);
                 int nColumns = onDupKeyPairs.size();
                 List<Expression> updateExpressions = Lists.newArrayListWithExpectedSize(nColumns);
-                LinkedHashSet<PColumn>updateColumns = Sets.newLinkedHashSetWithExpectedSize(nColumns + 1);
+                LinkedHashSet<PColumn> updateColumns = Sets.newLinkedHashSetWithExpectedSize(nColumns + 1);
                 updateColumns.add(new PColumnImpl(
                         table.getPKColumns().get(0).getName(), // Use first PK column name as we know it won't conflict with others
-                        null, PVarbinary.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null, false, null, false, false));
+                        null, PVarbinary.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null, false, null, false, false, null));
                 for (Pair<ColumnName,ParseNode> columnPair : onDupKeyPairs) {
                     ColumnName colName = columnPair.getFirst();
                     PColumn updateColumn = resolver.resolveColumn(null, colName.getFamilyName(), colName.getColumnName()).getColumn();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
index 39451b8..ed6c6cc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.compile;
 
+import static org.apache.phoenix.util.EncodedColumnsUtil.isPossibleToUseEncodedCQFilter;
+
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.util.Collections;
@@ -36,6 +38,7 @@ import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.expression.visitor.KeyValueExpressionVisitor;
 import org.apache.phoenix.filter.MultiCFCQKeyValueComparisonFilter;
 import org.apache.phoenix.filter.MultiCQKeyValueComparisonFilter;
+import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter;
 import org.apache.phoenix.filter.RowKeyComparisonFilter;
 import org.apache.phoenix.filter.SingleCFCQKeyValueComparisonFilter;
 import org.apache.phoenix.filter.SingleCQKeyValueComparisonFilter;
@@ -46,17 +49,21 @@ import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.parse.StatelessTraverseAllParseNodeVisitor;
 import org.apache.phoenix.parse.SubqueryParseNode;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.AmbiguousColumnException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.TypeMismatchException;
 import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ExpressionUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 
@@ -169,12 +176,14 @@ public class WhereCompiler {
         public Expression visit(ColumnParseNode node) throws SQLException {
             ColumnRef ref = resolveColumn(node);
             TableRef tableRef = ref.getTableRef();
+            Expression newColumnExpression = ref.newColumnExpression(node.isTableNameCaseSensitive(), node.isCaseSensitive());
             if (tableRef.equals(context.getCurrentTable()) && !SchemaUtil.isPKColumn(ref.getColumn())) {
+                byte[] cq = tableRef.getTable().getImmutableStorageScheme() == ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS 
+                		? QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES : ref.getColumn().getColumnQualifierBytes();
                 // track the where condition columns. Later we need to ensure the Scan in HRS scans these column CFs
-                context.addWhereCoditionColumn(ref.getColumn().getFamilyName().getBytes(), ref.getColumn().getName()
-                        .getBytes());
+                context.addWhereCoditionColumn(ref.getColumn().getFamilyName().getBytes(), cq);
             }
-            return ref.newColumnExpression(node.isTableNameCaseSensitive(), node.isCaseSensitive());
+			return newColumnExpression;
         }
 
         @Override
@@ -195,7 +204,7 @@ public class WhereCompiler {
             // just use that.
             try {
                 if (!SchemaUtil.isPKColumn(ref.getColumn())) {
-                    table.getColumn(ref.getColumn().getName().getString());
+                    table.getColumnForColumnName(ref.getColumn().getName().getString());
                 }
             } catch (AmbiguousColumnException e) {
                 disambiguateWithFamily = true;
@@ -223,6 +232,7 @@ public class WhereCompiler {
 
             }
         }
+        
         public Count getCount() {
             return count;
         }
@@ -258,6 +268,8 @@ public class WhereCompiler {
                     return null;
                 }
             });
+            QualifierEncodingScheme encodingScheme = context.getCurrentTable().getTable().getEncodingScheme();
+            ImmutableStorageScheme storageScheme = context.getCurrentTable().getTable().getImmutableStorageScheme();
             switch (counter.getCount()) {
             case NONE:
                 PTable table = context.getResolver().getTables().get(0).getTable();
@@ -270,7 +282,9 @@ public class WhereCompiler {
                 filter = disambiguateWithFamily ? new SingleCFCQKeyValueComparisonFilter(whereClause) : new SingleCQKeyValueComparisonFilter(whereClause);
                 break;
             case MULTIPLE:
-                filter = disambiguateWithFamily ? new MultiCFCQKeyValueComparisonFilter(whereClause) : new MultiCQKeyValueComparisonFilter(whereClause);
+                filter = isPossibleToUseEncodedCQFilter(encodingScheme, storageScheme) ? new MultiEncodedCQKeyValueComparisonFilter(
+                        whereClause, encodingScheme) : (disambiguateWithFamily ? new MultiCFCQKeyValueComparisonFilter(
+                        whereClause) : new MultiCQKeyValueComparisonFilter(whereClause));
                 break;
             }
             scan.setFilter(filter);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index f6bd512..b4bda98 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.coprocessor;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
@@ -50,11 +51,15 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.KeyValueSchema;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedResultTuple;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
@@ -78,12 +83,19 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     public static final String DELETE_CQ = "_DeleteCQ";
     public static final String DELETE_CF = "_DeleteCF";
     public static final String EMPTY_CF = "_EmptyCF";
+    public static final String EMPTY_COLUMN_QUALIFIER = "_EmptyColumnQualifier";
     public static final String SPECIFIC_ARRAY_INDEX = "_SpecificArrayIndex";
     public static final String GROUP_BY_LIMIT = "_GroupByLimit";
     public static final String LOCAL_INDEX = "_LocalIndex";
     public static final String LOCAL_INDEX_BUILD = "_LocalIndexBuild";
+    /* 
+    * Attribute to denote that the index maintainer has been serialized using its proto-buf presentation.
+    * Needed for backward compatibility purposes. TODO: get rid of this in next major release.
+    */
+    public static final String LOCAL_INDEX_BUILD_PROTO = "_LocalIndexBuild"; 
     public static final String LOCAL_INDEX_JOIN_SCHEMA = "_LocalIndexJoinSchema";
     public static final String DATA_TABLE_COLUMNS_TO_JOIN = "_DataTableColumnsToJoin";
+    public static final String COLUMNS_STORED_IN_SINGLE_CELL = "_ColumnsStoredInSingleCell";
     public static final String VIEW_CONSTANTS = "_ViewConstants";
     public static final String EXPECTED_UPPER_REGION_KEY = "_ExpectedUpperRegionKey";
     public static final String REVERSE_SCAN = "_ReverseScan";
@@ -102,6 +114,12 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     public final static String SCAN_OFFSET = "_RowOffset";
     public static final String SCAN_START_ROW_SUFFIX = "_ScanStartRowSuffix";
     public static final String SCAN_STOP_ROW_SUFFIX = "_ScanStopRowSuffix";
+    public final static String MIN_QUALIFIER = "_MinQualifier";
+    public final static String MAX_QUALIFIER = "_MaxQualifier";
+    public final static String USE_NEW_VALUE_COLUMN_QUALIFIER = "_UseNewValueColumnQualifier";
+    public final static String QUALIFIER_ENCODING_SCHEME = "_QualifierEncodingScheme";
+    public final static String IMMUTABLE_STORAGE_ENCODING_SCHEME = "_ImmutableStorageEncodingScheme";
+    public final static String USE_ENCODED_COLUMN_QUALIFIER_LIST = "_UseEncodedColumnQualifierList";
     
     /**
      * Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations
@@ -112,6 +130,8 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     /** Exposed for testing */
     public static final String SCANNER_OPENED_TRACE_INFO = "Scanner opened on server";
     protected Configuration rawConf;
+    protected QualifierEncodingScheme encodingScheme;
+    protected boolean useNewValueColumnQualifier;
 
     @Override
     public void start(CoprocessorEnvironment e) throws IOException {
@@ -184,6 +204,8 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
             // start exclusive and the stop inclusive.
             ScanUtil.setupReverseScan(scan);
         }
+        this.encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
+        this.useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
         return s;
     }
 
@@ -308,14 +330,14 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
      * @param indexMaintainer
      * @param viewConstants
      */
-    protected RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
+    RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
             final RegionScanner s, final int offset, final Scan scan,
             final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
             final Region dataRegion, final IndexMaintainer indexMaintainer,
             final byte[][] viewConstants, final TupleProjector projector,
-            final ImmutableBytesWritable ptr) {
+            final ImmutableBytesWritable ptr, final boolean useQualiferAsListIndex) {
         return getWrappedScanner(c, s, null, null, offset, scan, dataColumns, tupleProjector,
-                dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr);
+                dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr, useQualiferAsListIndex);
     }
 
     /**
@@ -333,7 +355,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
      * @param tx current transaction
      * @param viewConstants
      */
-    protected RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
+    RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
             final RegionScanner s, final Set<KeyValueColumnExpression> arrayKVRefs,
             final Expression[] arrayFuncRefs, final int offset, final Scan scan,
             final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
@@ -341,7 +363,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
             Transaction tx, 
             final byte[][] viewConstants, final KeyValueSchema kvSchema,
             final ValueBitSet kvSchemaBitSet, final TupleProjector projector,
-            final ImmutableBytesWritable ptr) {
+            final ImmutableBytesWritable ptr, final boolean useQualifierAsListIndex) {
         return new RegionScanner() {
 
             private boolean hasReferences = checkForReferenceFiles();
@@ -438,11 +460,13 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
                             tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
                     }
                     if (projector != null) {
-                        Tuple tuple = projector.projectResults(new ResultTuple(Result.create(result)));
+                        Tuple toProject = useQualifierAsListIndex ? new PositionBasedResultTuple(result) : new ResultTuple(Result.create(result));
+                        Tuple tuple = projector.projectResults(toProject, useNewValueColumnQualifier);
                         result.clear();
                         result.add(tuple.getValue(0));
-                        if(arrayElementCell != null)
+                        if (arrayElementCell != null) {
                             result.add(arrayElementCell);
+                        }
                     }
                     // There is a scanattribute set to retrieve the specific array element
                     return next;
@@ -472,6 +496,16 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
                         if (result.isEmpty()) {
                             return next;
                         }
+                        IndexUtil.wrapResultUsingOffset(c, result, offset, dataColumns,
+                            tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
+                    }
+                    if (projector != null) {
+                        Tuple toProject = useQualifierAsListIndex ? new PositionBasedMultiKeyValueTuple(result) : new ResultTuple(Result.create(result));
+                        Tuple tuple = projector.projectResults(toProject, useNewValueColumnQualifier);
+                        result.clear();
+                        result.add(tuple.getValue(0));
+                        if(arrayElementCell != null)
+                            result.add(arrayElementCell);
                     }
                     IndexUtil.wrapResultUsingOffset(c, result, offset, dataColumns,
                         tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
@@ -524,7 +558,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
 
             private int replaceArrayIndexElement(final Set<KeyValueColumnExpression> arrayKVRefs,
                     final Expression[] arrayFuncRefs, List<Cell> result) {
-                // make a copy of the results array here, as we're modifying it below
+             // make a copy of the results array here, as we're modifying it below
                 MultiKeyValueTuple tuple = new MultiKeyValueTuple(ImmutableList.copyOf(result));
                 // The size of both the arrays would be same?
                 // Using KeyValueSchema to set and retrieve the value
@@ -532,14 +566,15 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
                 Cell rowKv = result.get(0);
                 for (KeyValueColumnExpression kvExp : arrayKVRefs) {
                     if (kvExp.evaluate(tuple, ptr)) {
-                        for (int idx = tuple.size() - 1; idx >= 0; idx--) {
-                            Cell kv = tuple.getValue(idx);
+                        ListIterator<Cell> itr = result.listIterator();
+                        while (itr.hasNext()) {
+                            Cell kv = itr.next();
                             if (Bytes.equals(kvExp.getColumnFamily(), 0, kvExp.getColumnFamily().length,
                                     kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength())
-                                && Bytes.equals(kvExp.getColumnName(), 0, kvExp.getColumnName().length,
+                                && Bytes.equals(kvExp.getColumnQualifier(), 0, kvExp.getColumnQualifier().length,
                                         kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength())) {
                                 // remove the kv that has the full array values.
-                                result.remove(idx);
+                                itr.remove();
                                 break;
                             }
                         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 2c194c9..67cc114 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.TenantCache;
@@ -62,9 +63,13 @@ import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.Closeables;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.phoenix.util.LogUtil;
@@ -123,15 +128,20 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
                         .getEnvironment().getConfiguration());
 
         RegionScanner innerScanner = s;
-
-        byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
-        List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes);
+        boolean useProto = false;
+        byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD_PROTO);
+        useProto = localIndexBytes != null;
+        if (localIndexBytes == null) {
+            localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
+        }
+        List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto);
         TupleProjector tupleProjector = null;
         byte[][] viewConstants = null;
         ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan);
 
         final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
         final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
+        boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan));
         if (ScanUtil.isLocalIndex(scan) || (j == null && p != null)) {
             if (dataColumns != null) {
                 tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);
@@ -139,14 +149,14 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
             }
             ImmutableBytesPtr tempPtr = new ImmutableBytesPtr();
             innerScanner =
-                    getWrappedScanner(c, innerScanner, offset, scan, dataColumns, tupleProjector,
-                        c.getEnvironment().getRegion(), indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr);
-        }
+                    getWrappedScanner(c, innerScanner, offset, scan, dataColumns, tupleProjector, 
+                            c.getEnvironment().getRegion(), indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr, useQualifierAsIndex);
+        } 
 
         if (j != null) {
             innerScanner =
                     new HashJoinRegionScanner(innerScanner, p, j, ScanUtil.getTenantId(scan),
-                            c.getEnvironment());
+                            c.getEnvironment(), useQualifierAsIndex, useNewValueColumnQualifier);
         }
 
         long limit = Long.MAX_VALUE;
@@ -380,7 +390,9 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
             estDistVals = Math.max(MIN_DISTINCT_VALUES,
                             (int) (Bytes.toInt(estDistValsBytes) * 1.5f));
         }
-
+        
+        Pair<Integer, Integer> minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
+        boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan));
         final boolean spillableEnabled =
                 conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE);
 
@@ -391,12 +403,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
         boolean success = false;
         try {
             boolean hasMore;
-
-            MultiKeyValueTuple result = new MultiKeyValueTuple();
+            Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
             if (logger.isDebugEnabled()) {
                 logger.debug(LogUtil.addCustomAnnotations("Spillable groupby enabled: " + spillableEnabled, ScanUtil.getCustomAnnotations(scan)));
             }
-
             Region region = c.getEnvironment().getRegion();
             boolean acquiredLock = false;
             try {
@@ -404,7 +414,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
                 acquiredLock = true;
                 synchronized (scanner) {
                     do {
-                        List<Cell> results = new ArrayList<Cell>();
+                        List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
                         // Results are potentially returned even when the return
                         // value of s.next is false
                         // since this is an indication of whether or not there are
@@ -439,7 +449,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
             }
         }
     }
-
+    
     /**
      * Used for an aggregate query in which the key order match the group by key order. In this
      * case, we can do the aggregation as we scan, by detecting when the group by key changes.
@@ -454,6 +464,8 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
             logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over ordered rows with scan " + scan + ", group by "
                     + expressions + ", aggregators " + aggregators, ScanUtil.getCustomAnnotations(scan)));
         }
+        final Pair<Integer, Integer> minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
+        final boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers);
         return new BaseRegionScanner(scanner) {
             private long rowCount = 0;
             private ImmutableBytesPtr currentKey = null;
@@ -463,7 +475,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
                 boolean hasMore;
                 boolean atLimit;
                 boolean aggBoundary = false;
-                MultiKeyValueTuple result = new MultiKeyValueTuple();
+                Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
                 ImmutableBytesPtr key = null;
                 Aggregator[] rowAggregators = aggregators.getAggregators();
                 // If we're calculating no aggregate functions, we can exit at the
@@ -476,7 +488,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
                     acquiredLock = true;
                     synchronized (scanner) {
                         do {
-                            List<Cell> kvs = new ArrayList<Cell>();
+                            List<Cell> kvs = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
                             // Results are potentially returned even when the return
                             // value of s.next is false
                             // since this is an indication of whether or not there

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index 4340886..79ff4b0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -43,6 +43,7 @@ import org.apache.phoenix.parse.JoinTableNode.JoinType;
 import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.ValueBitSet;
+import org.apache.phoenix.schema.tuple.PositionBasedResultTuple;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ServerUtil;
@@ -62,9 +63,11 @@ public class HashJoinRegionScanner implements RegionScanner {
     private List<Tuple>[] tempTuples;
     private ValueBitSet tempDestBitSet;
     private ValueBitSet[] tempSrcBitSet;
-
+    private final boolean useQualifierAsListIndex;
+    private final boolean useNewValueColumnQualifier;
+    
     @SuppressWarnings("unchecked")
-    public HashJoinRegionScanner(RegionScanner scanner, TupleProjector projector, HashJoinInfo joinInfo, ImmutableBytesPtr tenantId, RegionCoprocessorEnvironment env) throws IOException {
+    public HashJoinRegionScanner(RegionScanner scanner, TupleProjector projector, HashJoinInfo joinInfo, ImmutableBytesPtr tenantId, RegionCoprocessorEnvironment env, boolean useQualifierAsIndex, boolean useNewValueColumnQualifier) throws IOException {
         this.env = env;
         this.scanner = scanner;
         this.projector = projector;
@@ -105,17 +108,18 @@ public class HashJoinRegionScanner implements RegionScanner {
             this.tempDestBitSet = ValueBitSet.newInstance(joinInfo.getJoinedSchema());
             this.projector.setValueBitSet(tempDestBitSet);
         }
+        this.useQualifierAsListIndex = useQualifierAsIndex;
+        this.useNewValueColumnQualifier = useNewValueColumnQualifier;
     }
 
     private void processResults(List<Cell> result, boolean hasBatchLimit) throws IOException {
         if (result.isEmpty())
             return;
-
-        Tuple tuple = new ResultTuple(Result.create(result));
+        Tuple tuple = useQualifierAsListIndex ? new PositionBasedResultTuple(result) : new ResultTuple(Result.create(result));
         // For backward compatibility. In new versions, HashJoinInfo.forceProjection()
         // always returns true.
         if (joinInfo.forceProjection()) {
-            tuple = projector.projectResults(tuple);
+            tuple = projector.projectResults(tuple, useNewValueColumnQualifier);
         }
 
         // TODO: fix below Scanner.next() and Scanner.nextRaw() methods as well.
@@ -148,7 +152,7 @@ public class HashJoinRegionScanner implements RegionScanner {
             } else {
                 KeyValueSchema schema = joinInfo.getJoinedSchema();
                 if (!joinInfo.forceProjection()) { // backward compatibility
-                    tuple = projector.projectResults(tuple);
+                    tuple = projector.projectResults(tuple, useNewValueColumnQualifier);
                 }
                 resultQueue.offer(tuple);
                 for (int i = 0; i < count; i++) {
@@ -175,8 +179,8 @@ public class HashJoinRegionScanner implements RegionScanner {
                             Tuple joined = tempSrcBitSet[i] == ValueBitSet.EMPTY_VALUE_BITSET ?
                                     lhs : TupleProjector.mergeProjectedValue(
                                             (ProjectedValueTuple) lhs, schema, tempDestBitSet,
-                                            null, joinInfo.getSchemas()[i], tempSrcBitSet[i],
-                                            joinInfo.getFieldPositions()[i]);
+                                            null, joinInfo.getSchemas()[i], tempSrcBitSet[i], 
+                                            joinInfo.getFieldPositions()[i], useNewValueColumnQualifier);
                             resultQueue.offer(joined);
                             continue;
                         }
@@ -184,8 +188,8 @@ public class HashJoinRegionScanner implements RegionScanner {
                             Tuple joined = tempSrcBitSet[i] == ValueBitSet.EMPTY_VALUE_BITSET ?
                                     lhs : TupleProjector.mergeProjectedValue(
                                             (ProjectedValueTuple) lhs, schema, tempDestBitSet,
-                                            t, joinInfo.getSchemas()[i], tempSrcBitSet[i],
-                                            joinInfo.getFieldPositions()[i]);
+                                            t, joinInfo.getSchemas()[i], tempSrcBitSet[i], 
+                                            joinInfo.getFieldPositions()[i], useNewValueColumnQualifier);
                             resultQueue.offer(joined);
                         }
                     }
@@ -317,7 +321,6 @@ public class HashJoinRegionScanner implements RegionScanner {
                 processResults(result, false); // TODO detect if limit used here
                 result.clear();
             }
-            
             return nextInQueue(result);
         } catch (Throwable t) {
             ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(), t);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index a0681fb..1858d0d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -27,6 +27,8 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME_INDEX;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE_BYTES;
@@ -34,6 +36,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS_BYT
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FAMILY_NAME_INDEX;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES;
@@ -57,6 +60,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORAGE_SCHEME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME_INDEX;
@@ -89,7 +93,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
-import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -189,8 +192,11 @@ import org.apache.phoenix.schema.PMetaDataEntity;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.EncodedCQCounter;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTable.LinkType;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
@@ -209,10 +215,12 @@ import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.types.PSmallint;
+import org.apache.phoenix.schema.types.PTinyint;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.KeyValueUtil;
@@ -283,6 +291,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             TABLE_FAMILY_BYTES, IS_NAMESPACE_MAPPED_BYTES);
     private static final KeyValue AUTO_PARTITION_SEQ_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, AUTO_PARTITION_SEQ_BYTES);
     private static final KeyValue APPEND_ONLY_SCHEMA_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, APPEND_ONLY_SCHEMA_BYTES);
+    private static final KeyValue STORAGE_SCHEME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, STORAGE_SCHEME_BYTES);
+    private static final KeyValue ENCODING_SCHEME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ENCODING_SCHEME_BYTES);
     
     private static final List<KeyValue> TABLE_KV_COLUMNS = Arrays.<KeyValue>asList(
             EMPTY_KEYVALUE_KV,
@@ -309,7 +319,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             UPDATE_CACHE_FREQUENCY_KV,
             IS_NAMESPACE_MAPPED_KV,
             AUTO_PARTITION_SEQ_KV,
-            APPEND_ONLY_SCHEMA_KV
+            APPEND_ONLY_SCHEMA_KV,
+            STORAGE_SCHEME_KV,
+            ENCODING_SCHEME_KV
             );
     static {
         Collections.sort(TABLE_KV_COLUMNS, KeyValue.COMPARATOR);
@@ -339,6 +351,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final int IS_NAMESPACE_MAPPED_INDEX = TABLE_KV_COLUMNS.indexOf(IS_NAMESPACE_MAPPED_KV);
     private static final int AUTO_PARTITION_SEQ_INDEX = TABLE_KV_COLUMNS.indexOf(AUTO_PARTITION_SEQ_KV);
     private static final int APPEND_ONLY_SCHEMA_INDEX = TABLE_KV_COLUMNS.indexOf(APPEND_ONLY_SCHEMA_KV);
+    private static final int STORAGE_SCHEME_INDEX = TABLE_KV_COLUMNS.indexOf(STORAGE_SCHEME_KV);
+    private static final int QUALIFIER_ENCODING_SCHEME_INDEX = TABLE_KV_COLUMNS.indexOf(ENCODING_SCHEME_KV);
 
     // KeyValues for Column
     private static final KeyValue DECIMAL_DIGITS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES);
@@ -352,6 +366,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final KeyValue IS_VIEW_REFERENCED_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_VIEW_REFERENCED_BYTES);
     private static final KeyValue COLUMN_DEF_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_DEF_BYTES);
     private static final KeyValue IS_ROW_TIMESTAMP_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_ROW_TIMESTAMP_BYTES);
+    private static final KeyValue COLUMN_QUALIFIER_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_QUALIFIER_BYTES);
+
     private static final List<KeyValue> COLUMN_KV_COLUMNS = Arrays.<KeyValue>asList(
             DECIMAL_DIGITS_KV,
             COLUMN_SIZE_KV,
@@ -364,11 +380,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             VIEW_CONSTANT_KV,
             IS_VIEW_REFERENCED_KV,
             COLUMN_DEF_KV,
-            IS_ROW_TIMESTAMP_KV
+            IS_ROW_TIMESTAMP_KV,
+            COLUMN_QUALIFIER_KV
             );
     static {
         Collections.sort(COLUMN_KV_COLUMNS, KeyValue.COMPARATOR);
     }
+    private static final KeyValue QUALIFIER_COUNTER_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_QUALIFIER_COUNTER_BYTES);
     private static final int DECIMAL_DIGITS_INDEX = COLUMN_KV_COLUMNS.indexOf(DECIMAL_DIGITS_KV);
     private static final int COLUMN_SIZE_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_SIZE_KV);
     private static final int NULLABLE_INDEX = COLUMN_KV_COLUMNS.indexOf(NULLABLE_KV);
@@ -380,6 +398,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final int IS_VIEW_REFERENCED_INDEX = COLUMN_KV_COLUMNS.indexOf(IS_VIEW_REFERENCED_KV);
     private static final int COLUMN_DEF_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_DEF_KV);
     private static final int IS_ROW_TIMESTAMP_INDEX = COLUMN_KV_COLUMNS.indexOf(IS_ROW_TIMESTAMP_KV);
+    private static final int COLUMN_QUALIFIER_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_QUALIFIER_KV);
     
     private static final int LINK_TYPE_INDEX = 0;
 
@@ -717,8 +736,16 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 isRowTimestampKV == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(
                         isRowTimestampKV.getValueArray(), isRowTimestampKV.getValueOffset(),
                         isRowTimestampKV.getValueLength()));
-
-        PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, false);
+        
+        boolean isPkColumn = famName == null || famName.getString() == null;
+        Cell columnQualifierKV = colKeyValues[COLUMN_QUALIFIER_INDEX];
+        // Older tables won't have column qualifier metadata present. To make things simpler, just set the
+        // column qualifier bytes by using the column name.
+        byte[] columnQualifierBytes = columnQualifierKV != null ? 
+                Arrays.copyOfRange(columnQualifierKV.getValueArray(),
+                    columnQualifierKV.getValueOffset(), columnQualifierKV.getValueOffset()
+                            + columnQualifierKV.getValueLength()) : (isPkColumn ? null : colName.getBytes());
+        PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, false, columnQualifierBytes);
         columns.add(column);
     }
     
@@ -926,37 +953,55 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         boolean isAppendOnlySchema = isAppendOnlySchemaKv == null ? false
                 : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(isAppendOnlySchemaKv.getValueArray(),
                     isAppendOnlySchemaKv.getValueOffset(), isAppendOnlySchemaKv.getValueLength()));
-        
+        Cell storageSchemeKv = tableKeyValues[STORAGE_SCHEME_INDEX];
+        //TODO: change this once we start having other values for storage schemes
+        ImmutableStorageScheme storageScheme = storageSchemeKv == null ? ImmutableStorageScheme.ONE_CELL_PER_COLUMN : ImmutableStorageScheme
+                .fromSerializedValue((byte)PTinyint.INSTANCE.toObject(storageSchemeKv.getValueArray(),
+                        storageSchemeKv.getValueOffset(), storageSchemeKv.getValueLength()));
+        Cell encodingSchemeKv = tableKeyValues[QUALIFIER_ENCODING_SCHEME_INDEX];
+        QualifierEncodingScheme encodingScheme = encodingSchemeKv == null ? QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : QualifierEncodingScheme
+                .fromSerializedValue((byte)PTinyint.INSTANCE.toObject(encodingSchemeKv.getValueArray(),
+                    encodingSchemeKv.getValueOffset(), encodingSchemeKv.getValueLength()));
         
         List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnCount);
         List<PTable> indexes = Lists.newArrayList();
         List<PName> physicalTables = Lists.newArrayList();
         PName parentTableName = tableType == INDEX ? dataTableName : null;
         PName parentSchemaName = tableType == INDEX ? schemaName : null;
+        EncodedCQCounter cqCounter =
+                (!EncodedColumnsUtil.usesEncodedColumnNames(encodingScheme) || tableType == PTableType.VIEW) ? PTable.EncodedCQCounter.NULL_COUNTER
+                        : new EncodedCQCounter();
         while (true) {
-          results.clear();
-          scanner.next(results);
-          if (results.isEmpty()) {
-              break;
-          }
-          Cell colKv = results.get(LINK_TYPE_INDEX);
-          int colKeyLength = colKv.getRowLength();
-          PName colName = newPName(colKv.getRowArray(), colKv.getRowOffset() + offset, colKeyLength-offset);
-          int colKeyOffset = offset + colName.getBytes().length + 1;
-          PName famName = newPName(colKv.getRowArray(), colKv.getRowOffset() + colKeyOffset, colKeyLength-colKeyOffset);
-          if (colName.getString().isEmpty() && famName != null) {
-              LinkType linkType = LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]);
-              if (linkType == LinkType.INDEX_TABLE) {
-                  addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes);
-              } else if (linkType == LinkType.PHYSICAL_TABLE) {
-                  physicalTables.add(famName);
-              } else if (linkType == LinkType.PARENT_TABLE) {
-                  parentTableName = PNameFactory.newName(SchemaUtil.getTableNameFromFullName(famName.getBytes()));
-                  parentSchemaName = PNameFactory.newName(SchemaUtil.getSchemaNameFromFullName(famName.getBytes()));
-              }
-          } else {
-              addColumnToTable(results, colName, famName, colKeyValues, columns, saltBucketNum != null);
-          }
+            results.clear();
+            scanner.next(results);
+            if (results.isEmpty()) {
+                break;
+            }
+            Cell colKv = results.get(LINK_TYPE_INDEX);
+            if (colKv != null) {
+                int colKeyLength = colKv.getRowLength();
+                PName colName = newPName(colKv.getRowArray(), colKv.getRowOffset() + offset, colKeyLength-offset);
+                int colKeyOffset = offset + colName.getBytes().length + 1;
+                PName famName = newPName(colKv.getRowArray(), colKv.getRowOffset() + colKeyOffset, colKeyLength-colKeyOffset);
+                if (isQualifierCounterKV(colKv)) {
+                    Integer value = PInteger.INSTANCE.getCodec().decodeInt(colKv.getValueArray(), colKv.getValueOffset(), SortOrder.ASC);
+                    cqCounter.setValue(famName.getString(), value);
+                } else {
+                    if (colName.getString().isEmpty() && famName != null) {
+                        LinkType linkType = LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]);
+                        if (linkType == LinkType.INDEX_TABLE) {
+                            addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes);
+                        } else if (linkType == LinkType.PHYSICAL_TABLE) {
+                            physicalTables.add(famName);
+                        } else if (linkType == LinkType.PARENT_TABLE) {
+                            parentTableName = PNameFactory.newName(SchemaUtil.getTableNameFromFullName(famName.getBytes()));
+                            parentSchemaName = PNameFactory.newName(SchemaUtil.getSchemaNameFromFullName(famName.getBytes()));
+                        }
+                    } else {
+                        addColumnToTable(results, colName, famName, colKeyValues, columns, saltBucketNum != null);
+                    }
+                } 
+            }
         }
         // Avoid querying the stats table because we're holding the rowLock here. Issuing an RPC to a remote
         // server while holding this lock is a bad idea and likely to cause contention.
@@ -964,9 +1009,17 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 pkName, saltBucketNum, columns, parentSchemaName, parentTableName, indexes, isImmutableRows, physicalTables, defaultFamilyName,
                 viewStatement, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType,
                 rowKeyOrderOptimizable, transactional, updateCacheFrequency, baseColumnCount,
-                indexDisableTimestamp, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema);
+                indexDisableTimestamp, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, storageScheme, encodingScheme, cqCounter);
     }
-
+    
+    private boolean isQualifierCounterKV(Cell kv) {
+        int cmp =
+                Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(),
+                    kv.getQualifierLength(), QUALIFIER_COUNTER_KV.getQualifierArray(),
+                    QUALIFIER_COUNTER_KV.getQualifierOffset(), QUALIFIER_COUNTER_KV.getQualifierLength());
+        return cmp == 0;
+    }
+    
     private PSchema getSchema(RegionScanner scanner, long clientTimeStamp) throws IOException, SQLException {
         List<Cell> results = Lists.newArrayList();
         scanner.next(results);
@@ -1437,7 +1490,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 // tableMetadata and set the view statement and partition column correctly
                 if (parentTable!=null && parentTable.getAutoPartitionSeqName()!=null) {
                     long autoPartitionNum = 1;
-                    final Properties props = new Properties();
                     try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
                         Statement stmt = connection.createStatement()) {
                         String seqName = parentTable.getAutoPartitionSeqName();
@@ -1505,46 +1557,46 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 Short indexId = null;
                 if (request.hasAllocateIndexId() && request.getAllocateIndexId()) {
                     String tenantIdStr = tenantIdBytes.length == 0 ? null : Bytes.toString(tenantIdBytes);
-                    try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class)){
-                    PName physicalName = parentTable.getPhysicalName();
-                    int nSequenceSaltBuckets = connection.getQueryServices().getSequenceSaltBuckets();
-                    SequenceKey key = MetaDataUtil.getViewIndexSequenceKey(tenantIdStr, physicalName,
+                    try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class)) {
+                        PName physicalName = parentTable.getPhysicalName();
+                        int nSequenceSaltBuckets = connection.getQueryServices().getSequenceSaltBuckets();
+                        SequenceKey key = MetaDataUtil.getViewIndexSequenceKey(tenantIdStr, physicalName,
                             nSequenceSaltBuckets, parentTable.isNamespaceMapped() );
                         // TODO Review Earlier sequence was created at (SCN-1/LATEST_TIMESTAMP) and incremented at the client max(SCN,dataTable.getTimestamp), but it seems we should
                         // use always LATEST_TIMESTAMP to avoid seeing wrong sequence values by different connection having SCN
                         // or not. 
-                    long sequenceTimestamp = HConstants.LATEST_TIMESTAMP;
-                    try {
-                        connection.getQueryServices().createSequence(key.getTenantId(), key.getSchemaName(), key.getSequenceName(),
+                        long sequenceTimestamp = HConstants.LATEST_TIMESTAMP;
+                        try {
+                            connection.getQueryServices().createSequence(key.getTenantId(), key.getSchemaName(), key.getSequenceName(),
                                 Short.MIN_VALUE, 1, 1, Long.MIN_VALUE, Long.MAX_VALUE, false, sequenceTimestamp);
-                    } catch (SequenceAlreadyExistsException e) {
-                    }
-                    long[] seqValues = new long[1];
-                    SQLException[] sqlExceptions = new SQLException[1];
-                    connection.getQueryServices().incrementSequences(Collections.singletonList(new SequenceAllocation(key, 1)),
+                        } catch (SequenceAlreadyExistsException e) {
+                        }
+                        long[] seqValues = new long[1];
+                        SQLException[] sqlExceptions = new SQLException[1];
+                        connection.getQueryServices().incrementSequences(Collections.singletonList(new SequenceAllocation(key, 1)),
                             HConstants.LATEST_TIMESTAMP, seqValues, sqlExceptions);
-                    if (sqlExceptions[0] != null) {
-                        throw sqlExceptions[0];
-                    }
-                    long seqValue = seqValues[0];
-                    if (seqValue > Short.MAX_VALUE) {
-                        builder.setReturnCode(MetaDataProtos.MutationCode.TOO_MANY_INDEXES);
-                        builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
-                        done.run(builder.build());
-                        return;
-                    }
-                    Put tableHeaderPut = MetaDataUtil.getPutOnlyTableHeaderRow(tableMetadata);
-                    NavigableMap<byte[], List<Cell>> familyCellMap = tableHeaderPut.getFamilyCellMap();
-                    List<Cell> cells = familyCellMap.get(TABLE_FAMILY_BYTES);
-                    Cell cell = cells.get(0);
-                    PDataType dataType = MetaDataUtil.getViewIndexIdDataType();
-                    Object val = dataType.toObject(seqValue, PLong.INSTANCE);
-                    byte[] bytes = new byte [dataType.getByteSize() + 1];
-                    dataType.toBytes(val, bytes, 0);
-                    Cell indexIdCell = new KeyValue(cell.getRow(), cell.getFamily(), VIEW_INDEX_ID_BYTES,
+                        if (sqlExceptions[0] != null) {
+                            throw sqlExceptions[0];
+                        }
+                        long seqValue = seqValues[0];
+                        if (seqValue > Short.MAX_VALUE) {
+                            builder.setReturnCode(MetaDataProtos.MutationCode.TOO_MANY_INDEXES);
+                            builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                            done.run(builder.build());
+                            return;
+                        }
+                        Put tableHeaderPut = MetaDataUtil.getPutOnlyTableHeaderRow(tableMetadata);
+                        NavigableMap<byte[], List<Cell>> familyCellMap = tableHeaderPut.getFamilyCellMap();
+                        List<Cell> cells = familyCellMap.get(TABLE_FAMILY_BYTES);
+                        Cell cell = cells.get(0);
+                        PDataType dataType = MetaDataUtil.getViewIndexIdDataType();
+                        Object val = dataType.toObject(seqValue, PLong.INSTANCE);
+                        byte[] bytes = new byte [dataType.getByteSize() + 1];
+                        dataType.toBytes(val, bytes, 0);
+                        Cell indexIdCell = new KeyValue(cell.getRow(), cell.getFamily(), VIEW_INDEX_ID_BYTES,
                             cell.getTimestamp(), Type.codeToType(cell.getTypeByte()), bytes);
-                    cells.add(indexIdCell);
-                    indexId = (short) seqValue;
+                        cells.add(indexIdCell);
+                        indexId = (short) seqValue;
                     }
                 }
                 
@@ -2169,6 +2221,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 int pkCount = getVarChars(m.getRow(), rkmd);
                 // check if this put is for adding a column
                 if (pkCount > COLUMN_NAME_INDEX
+                        && rkmd[COLUMN_NAME_INDEX] != null && rkmd[COLUMN_NAME_INDEX].length > 0
                         && Bytes.compareTo(schemaName, rkmd[SCHEMA_NAME_INDEX]) == 0
                         && Bytes.compareTo(tableName, rkmd[TABLE_NAME_INDEX]) == 0) {
                     columnPutsForBaseTable.add(new PutWithOrdinalPosition((Put)m, getInteger((Put)m, TABLE_FAMILY_BYTES, ORDINAL_POSITION_BYTES)));
@@ -2221,8 +2274,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 String columnName = Bytes.toString(rkmd[COLUMN_NAME_INDEX]);
                 String columnFamily = rkmd[FAMILY_NAME_INDEX] == null ? null : Bytes.toString(rkmd[FAMILY_NAME_INDEX]);
                 try {
-                    existingViewColumn = columnFamily == null ? view.getColumn(columnName) : view.getColumnFamily(
-                            columnFamily).getColumn(columnName);
+                    existingViewColumn = columnFamily == null ? view.getColumnForColumnName(columnName) : view.getColumnFamily(
+                            columnFamily).getPColumnForColumnName(columnName);
                 } catch (ColumnFamilyNotFoundException e) {
                     // ignore since it means that the column family is not present for the column to be added.
                 } catch (ColumnNotFoundException e) {
@@ -2551,8 +2604,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 byte[] columnKey = getColumnKey(viewKey, columnName, columnFamily);
                 try {
                     existingViewColumn =
-                            columnFamily == null ? view.getColumn(columnName) : view
-                                    .getColumnFamily(columnFamily).getColumn(columnName);
+                            columnFamily == null ? view.getColumnForColumnName(columnName) : view
+                                    .getColumnFamily(columnFamily).getPColumnForColumnName(columnName);
                 } catch (ColumnFamilyNotFoundException e) {
                     // ignore since it means that the column family is not present for the column to
                     // be added.
@@ -2618,7 +2671,25 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     
     private MetaDataMutationResult validateColumnForAddToBaseTable(PColumn existingViewColumn, Put columnToBeAdded, PTable basePhysicalTable, boolean isColumnToBeAddPkCol, PTable view) {
         if (existingViewColumn != null) {
-            
+            if (EncodedColumnsUtil.usesEncodedColumnNames(basePhysicalTable) && !SchemaUtil.isPKColumn(existingViewColumn)) {
+                /*
+                 * If the column already exists in a view, then we cannot add the column to the base
+                 * table. The reason is subtle and is as follows: consider the case where a table
+                 * has two views where both the views have the same key value column KV. Now, we
+                 * dole out encoded column qualifiers for key value columns in views by using the
+                 * counters stored in the base physical table. So the KV column can have different
+                 * column qualifiers for the two views. For example, 11 for VIEW1 and 12 for VIEW2.
+                 * This naturally extends to rows being inserted using the two views having
+                 * different column qualifiers for the column named KV. Now, when an attempt is made
+                 * to add column KV to the base table, we cannot decide which column qualifier
+                 * should that column be assigned. It cannot be a number different than 11 or 12
+                 * since a query like SELECT KV FROM BASETABLE would return null for KV which is
+                 * incorrect since column KV is present in rows inserted from the two views. We
+                 * cannot use 11 or 12 either because we will then incorrectly return value of KV
+                 * column inserted using only one view.
+                 */
+                return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, EnvironmentEdgeManager.currentTimeMillis(), basePhysicalTable);
+            }
             // Validate data type is same
             int baseColumnDataType = getInteger(columnToBeAdded, TABLE_FAMILY_BYTES, DATA_TYPE_BYTES);
             if (baseColumnDataType != existingViewColumn.getDataType().getSqlType()) {
@@ -2848,6 +2919,16 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                                     return mutationResult;
                             } 
                         }
+                    } else if (type == PTableType.VIEW
+                            && EncodedColumnsUtil.usesEncodedColumnNames(table)) {
+                        /*
+                         * When adding a column to a view that uses encoded column name scheme, we
+                         * need to modify the CQ counters stored in the view's physical table. So to
+                         * make sure clients get the latest PTable, we need to invalidate the cache
+                         * entry.
+                         */
+                        invalidateList.add(new ImmutableBytesPtr(MetaDataUtil
+                                .getPhysicalTableRowForView(table)));
                     }
                     for (Mutation m : tableMetaData) {
                         byte[] key = m.getRow();
@@ -2861,7 +2942,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                                         && rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) {
                                     PColumnFamily family =
                                             table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
-                                    family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
+                                    family.getPColumnForColumnNameBytes(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
                                 } else if (pkCount > COLUMN_NAME_INDEX
                                         && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
                                     addingPKColumn = true;
@@ -3114,7 +3195,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                                         PColumnFamily family =
                                                 table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
                                         columnToDelete =
-                                                family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
+                                                family.getPColumnForColumnNameBytes(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
                                     } else if (pkCount > COLUMN_NAME_INDEX
                                             && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
                                         deletePKColumn = true;
@@ -3203,10 +3284,12 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             byte[] indexKey =
                     SchemaUtil.getTableKey(tenantId, index.getSchemaName().getBytes(), index
                             .getTableName().getBytes());
+            Pair<String, String> columnToDeleteInfo = new Pair<>(columnToDelete.getFamilyName().getString(), columnToDelete.getName().getString());
+            ColumnReference colDropRef = new ColumnReference(columnToDelete.getFamilyName().getBytes(), columnToDelete.getColumnQualifierBytes());
+            boolean isColumnIndexed = indexMaintainer.getIndexedColumnInfo().contains(columnToDeleteInfo);
+            boolean isCoveredColumn = indexMaintainer.getCoveredColumns().contains(colDropRef);
             // If index requires this column for its pk, then drop it
-            if (indexMaintainer.getIndexedColumns().contains(
-                new ColumnReference(columnToDelete.getFamilyName().getBytes(), columnToDelete
-                        .getName().getBytes()))) {
+            if (isColumnIndexed) {
                 // Since we're dropping the index, lock it to ensure
                 // that a change in index state doesn't
                 // occur while we're dropping it.
@@ -3227,9 +3310,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 invalidateList.add(new ImmutableBytesPtr(indexKey));
             }
             // If the dropped column is a covered index column, invalidate the index
-            else if (indexMaintainer.getCoveredColumns().contains(
-                new ColumnReference(columnToDelete.getFamilyName().getBytes(), columnToDelete
-                        .getName().getBytes()))) {
+            else if (isCoveredColumn){
                 invalidateList.add(new ImmutableBytesPtr(indexKey));
             }
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 83290db..dd445ce 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -85,8 +85,9 @@ public abstract class MetaDataProtocol extends MetaDataService {
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 = MIN_TABLE_TIMESTAMP + 18;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_8_1 = MIN_TABLE_TIMESTAMP + 18;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0 = MIN_TABLE_TIMESTAMP + 20;
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 = MIN_TABLE_TIMESTAMP + 25;
     // MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_* constants
-    public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0;
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0;
     
     // ALWAYS update this map whenever rolling out a new release (major, minor or patch release). 
     // Key is the SYSTEM.CATALOG timestamp for the version and value is the version string.
@@ -101,6 +102,7 @@ public abstract class MetaDataProtocol extends MetaDataService {
         TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, "4.7.x");
         TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0, "4.8.x");
         TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0, "4.9.x");
+        TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0, "4.10.x");
     }
     
     public static final String CURRENT_CLIENT_VERSION = PHOENIX_MAJOR_VERSION + "." + PHOENIX_MINOR_VERSION + "." + PHOENIX_PATCH_NUMBER; 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index 8f36803..9482d37 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -387,7 +387,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
 							IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr, indexesToPartiallyRebuild,
 									conn);
 							byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
-							dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+							dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
                             LOG.info("Starting to partially build indexes:" + indexesToPartiallyRebuild
                                     + " on data table:" + dataPTable.getName() + " with the earliest disable timestamp:"
                                     + earliestDisableTimestamp + " till "
@@ -511,4 +511,4 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
 				put);
 
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index ade88db..02b05f9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.coprocessor;
 
+import static org.apache.phoenix.util.EncodedColumnsUtil.getMinMaxQualifiersFromScan;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -44,6 +46,7 @@ import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.expression.SingleCellColumnExpression;
 import org.apache.phoenix.expression.function.ArrayIndexFunction;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -57,10 +60,13 @@ import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
+import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
+import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
@@ -107,7 +113,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
         }
     }
 
-    public static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s) {
+    private static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s) {
         byte[] topN = scan.getAttribute(BaseScannerRegionObserver.TOPN);
         if (topN == null) {
             return null;
@@ -125,7 +131,8 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
                 orderByExpression.readFields(input);
                 orderByExpressions.add(orderByExpression);
             }
-            ResultIterator inner = new RegionScannerResultIterator(s);
+            QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
+            ResultIterator inner = new RegionScannerResultIterator(s, EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan), encodingScheme);
             return new OrderedResultIterator(inner, orderByExpressions, thresholdBytes, limit >= 0 ? limit : null, null,
                     estimatedRowSize);
         } catch (IOException e) {
@@ -151,7 +158,9 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
             DataInputStream input = new DataInputStream(stream);
             int arrayKVRefSize = WritableUtils.readVInt(input);
             for (int i = 0; i < arrayKVRefSize; i++) {
-                KeyValueColumnExpression kvExp = new KeyValueColumnExpression();
+                ImmutableStorageScheme scheme = EncodedColumnsUtil.getImmutableStorageScheme(scan);
+                KeyValueColumnExpression kvExp = scheme != ImmutableStorageScheme.ONE_CELL_PER_COLUMN ? new SingleCellColumnExpression()
+                        : new KeyValueColumnExpression();
                 kvExp.readFields(input);
                 arrayKVRefs.add(kvExp);
             }
@@ -209,8 +218,13 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
         if (dataColumns != null) {
             tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);
             dataRegion = c.getEnvironment().getRegion();
-            byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
-            List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes);
+            boolean useProto = false;
+            byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD_PROTO);
+            useProto = localIndexBytes != null;
+            if (localIndexBytes == null) {
+                localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
+            }
+            List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto);
             indexMaintainer = indexMaintainers.get(0);
             viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
             byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
@@ -219,21 +233,22 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
 
         final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
         final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
+        boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(getMinMaxQualifiersFromScan(scan)) && scan.getAttribute(BaseScannerRegionObserver.TOPN) != null;
         innerScanner =
                 getWrappedScanner(c, innerScanner, arrayKVRefs, arrayFuncRefs, offset, scan,
                     dataColumns, tupleProjector, dataRegion, indexMaintainer, tx,
-                    viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null, ptr);
+                    viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null, ptr, useQualifierAsIndex);
 
         final ImmutableBytesPtr tenantId = ScanUtil.getTenantId(scan);
         if (j != null) {
-            innerScanner = new HashJoinRegionScanner(innerScanner, p, j, tenantId, c.getEnvironment());
+            innerScanner = new HashJoinRegionScanner(innerScanner, p, j, tenantId, c.getEnvironment(), useQualifierAsIndex, useNewValueColumnQualifier);
         }
         if (scanOffset != null) {
             innerScanner = getOffsetScanner(c, innerScanner,
-                    new OffsetResultIterator(new RegionScannerResultIterator(innerScanner), scanOffset),
+                    new OffsetResultIterator(new RegionScannerResultIterator(innerScanner, getMinMaxQualifiersFromScan(scan), encodingScheme), scanOffset),
                     scan.getAttribute(QueryConstants.LAST_SCAN) != null);
         }
-        final OrderedResultIterator iterator = deserializeFromScan(scan,innerScanner);
+        final OrderedResultIterator iterator = deserializeFromScan(scan, innerScanner);
         if (iterator == null) {
             return innerScanner;
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3c7ff99b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
index bf889d5..98f57ad 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
@@ -72,7 +72,7 @@ public class ServerCachingEndpointImpl extends ServerCachingService implements C
           (Class<ServerCacheFactory>) Class.forName(request.getCacheFactory().getClassName());
       ServerCacheFactory cacheFactory = serverCacheFactoryClass.newInstance();
       tenantCache.addServerCache(new ImmutableBytesPtr(request.getCacheId().toByteArray()),
-        cachePtr, txState, cacheFactory);
+        cachePtr, txState, cacheFactory, request.hasHasProtoBufIndexMaintainer() && request.getHasProtoBufIndexMaintainer());
     } catch (Throwable e) {
       ProtobufUtil.setControllerException(controller, new IOException(e));
     }