You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2016/05/28 00:45:08 UTC

[2/4] phoenix git commit: Optimize order by and grouped aggregations by taking advantage of column encoding

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 4b4caa2..b30e68d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -33,7 +33,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODED_COLUMN_QUALIFIER;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE;
@@ -41,7 +41,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODED_COLUMN_QUALIFIER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP;
@@ -86,6 +86,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
 import static org.apache.phoenix.query.QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT;
+import static org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
 import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
 import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_UPDATE_STATS_ASYNC;
@@ -119,6 +120,8 @@ import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 
+import co.cask.tephra.TxConstants;
+
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -130,7 +133,6 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.compile.IndexExpressionCompiler;
@@ -190,6 +192,7 @@ import org.apache.phoenix.query.ConnectionQueryServices.Feature;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable.EncodedCQCounter;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTable.LinkType;
 import org.apache.phoenix.schema.PTable.StorageScheme;
@@ -219,7 +222,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Objects;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
@@ -227,8 +229,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Ints;
 
-import co.cask.tephra.TxConstants;
-
+import static org.apache.phoenix.schema.PTable.EncodedCQCounter.NULL_COUNTER;
 public class MetaDataClient {
     private static final Logger logger = LoggerFactory.getLogger(MetaDataClient.class);
 
@@ -289,7 +290,7 @@ public class MetaDataClient {
             LINK_TYPE + "," +
             PARENT_TENANT_ID + " " + PVarchar.INSTANCE.getSqlTypeName() + // Dynamic column for now to prevent schema change
             ") VALUES (?, ?, ?, ?, ?, ?)";
-    private static final String UPDATE_ENCODED_COLUMN_COUNT = 
+    private static final String UPDATE_ENCODED_COLUMN_COUNTER = 
             "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
             TENANT_ID + ", " + 
             TABLE_SCHEM + "," +
@@ -826,7 +827,7 @@ public class MetaDataClient {
         argUpsert.execute();
     }
 
-    private PColumn newColumn(int position, ColumnDef def, PrimaryKeyConstraint pkConstraint, String defaultColumnFamily, boolean addingToPK, Map<String, Integer> nextEncodedColumnQualifiers) throws SQLException {
+    private PColumn newColumn(int position, ColumnDef def, PrimaryKeyConstraint pkConstraint, String defaultColumnFamily, boolean addingToPK, EncodedCQCounter encodedColumnQualifier) throws SQLException {
         try {
             ColumnName columnDefName = def.getColumnDefName();
             SortOrder sortOrder = def.getSortOrder();
@@ -874,17 +875,8 @@ public class MetaDataClient {
                 }
                 isNull = false;
             }
-            Integer columnQualifier = null;
-            if (!isPK && nextEncodedColumnQualifiers != null) {
-                columnQualifier = nextEncodedColumnQualifiers.get(familyName.getString());
-                if (columnQualifier == null) {
-                    // We use columnQualifier 0 for the special empty key value.
-                    columnQualifier = 1;
-                }
-                nextEncodedColumnQualifiers.put(familyName.toString(), columnQualifier + 1);
-            }
             PColumn column = new PColumnImpl(PNameFactory.newName(columnName), familyName, def.getDataType(),
-                    def.getMaxLength(), def.getScale(), isNull, position, sortOrder, def.getArraySize(), null, false, def.getExpression(), isRowTimestamp, false, columnQualifier);
+                    def.getMaxLength(), def.getScale(), isNull, position, sortOrder, def.getArraySize(), null, false, def.getExpression(), isRowTimestamp, false, isPK ? null : encodedColumnQualifier.getValue());
             return column;
         } catch (IllegalArgumentException e) { // Based on precondition check in constructor
             throw new SQLException(e);
@@ -1921,10 +1913,8 @@ public class MetaDataClient {
             int position = positionOffset;
             
             StorageScheme storageScheme = StorageScheme.NON_ENCODED_COLUMN_NAMES;
-            Map<String, Integer> nextCQCounters = null;
-            Map<String, Integer> updatedPhysicalTableCQCounters = null;
+            EncodedCQCounter cqCounter = NULL_COUNTER;
             PTable viewPhysicalTable = null;
-            //TODO: samarth what about local indexes.
             if (SchemaUtil.isSystemTable(Bytes.toBytes(SchemaUtil.getTableName(schemaName, tableName)))) {
                 // System tables have hard-coded column qualifiers. So we can't use column encoding for them.
                 storageScheme = StorageScheme.NON_ENCODED_COLUMN_NAMES;
@@ -1938,14 +1928,13 @@ public class MetaDataClient {
                 } else {
                     /*
                      * For regular phoenix views, use the storage scheme of the physical table since they all share the
-                     * the same HTable. Views always use the base table's column qualifier counters for doling out
-                     * encoded column qualifiers.
+                     * the same HTable. Views always use the base table's column qualifier counter for doling out
+                     * encoded column qualifier.
                      */
                     viewPhysicalTable = connection.getTable(new PTableKey(null, physicalNames.get(0).getString()));
                     storageScheme = viewPhysicalTable.getStorageScheme();
                     if (storageScheme == StorageScheme.ENCODED_COLUMN_NAMES) {
-                        nextCQCounters  = viewPhysicalTable.getEncodedCQCounters();
-                        updatedPhysicalTableCQCounters = Maps.newHashMapWithExpectedSize(colDefs.size());
+                        cqCounter  = viewPhysicalTable.getEncodedCQCounter();
                     }
                 }
             } else {
@@ -1963,8 +1952,8 @@ public class MetaDataClient {
                  * If the hbase table already exists, then possibly encoded or non-encoded column qualifiers already exist. 
                  * In this case we pursue ahead with non-encoded column qualifier scheme. If the phoenix table already exists 
                  * then we rely on the PTable, with appropriate storage scheme, returned in the MetadataMutationResult to be updated 
-                 * in the client cache. If it doesn't then the non-encoded column qualifier scheme works because we cannot control 
-                 * the column qualifiers that were used when populating the hbase table.
+                 * in the client cache. If the phoenix table already doesn't exist then the non-encoded column qualifier scheme works
+                 * because we cannot control the column qualifiers that were used when populating the hbase table.
                  */
                 byte[] tableNameBytes = SchemaUtil.getTableNameAsBytes(schemaName, tableName);
                 boolean tableExists = true;
@@ -1987,10 +1976,11 @@ public class MetaDataClient {
                     storageScheme = StorageScheme.ENCODED_COLUMN_NAMES;
                 }
                 if (storageScheme == StorageScheme.ENCODED_COLUMN_NAMES) {
-                    nextCQCounters  = Maps.newHashMapWithExpectedSize(colDefs.size() - pkColumns.size());
+                    cqCounter = new EncodedCQCounter(ENCODED_CQ_COUNTER_INITIAL_VALUE);
                 }
             }
             
+            Integer initialCounterValue = cqCounter.getValue();
             for (ColumnDef colDef : colDefs) {
                 rowTimeStampColumnAlreadyFound = checkAndValidateRowTimestampCol(colDef, pkConstraint, rowTimeStampColumnAlreadyFound, tableType);
                 if (colDef.isPK()) { // i.e. the column is declared as CREATE TABLE COLNAME DATATYPE PRIMARY KEY...
@@ -2009,11 +1999,13 @@ public class MetaDataClient {
                                 .setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException();
                     }
                 }
-                PColumn column = newColumn(position++, colDef, pkConstraint, defaultFamilyName, false, nextCQCounters);
-                String cf = column.getFamilyName() != null ? column.getFamilyName().getString() : null;
-                if (updatedPhysicalTableCQCounters != null && cf != null && EncodedColumnsUtil.hasEncodedColumnName(column)) {
-                    updatedPhysicalTableCQCounters.put(cf, nextCQCounters.get(cf));
+                ColumnName columnDefName = colDef.getColumnDefName();
+                PColumn column = null;
+                column = newColumn(position++, colDef, pkConstraint, defaultFamilyName, false, cqCounter);
+                if (incrementEncodedCQCounter(storageScheme, pkConstraint, colDef, columnDefName)) {
+                    cqCounter.increment();
                 }
+                String cf = column.getFamilyName() != null ? column.getFamilyName().getString() : null;
                 if (SchemaUtil.isPKColumn(column)) {
                     // TODO: remove this constraint?
                     if (pkColumnsIterator.hasNext() && !column.getName().getString().equals(pkColumnsIterator.next().getFirst().getColumnName())) {
@@ -2050,42 +2042,34 @@ public class MetaDataClient {
             }
             
             if (storageScheme == StorageScheme.ENCODED_COLUMN_NAMES) {
-                // Store the encoded column counter for each column family for phoenix entities that have their own hbase
+                // Store the encoded column counter for phoenix entities that have their own hbase
                 // tables i.e. base tables and indexes.
-                Map<String, Integer> mapToUse = tableType == VIEW ? updatedPhysicalTableCQCounters : nextCQCounters;
-                if (tableType != VIEW && nextCQCounters.isEmpty()) {
-                    // Case when a table or index has only pk columns.
-                    nextCQCounters.put(defaultFamilyName == null ? QueryConstants.DEFAULT_COLUMN_FAMILY : defaultFamilyName, 1);
-                }
-                if (mapToUse != null) {
-                    PreparedStatement linkStatement = connection.prepareStatement(UPDATE_ENCODED_COLUMN_COUNT);
-                    String schemaNameToUse = tableType == VIEW ? viewPhysicalTable.getSchemaName().getString() : schemaName;
-                    String tableNameToUse = tableType == VIEW ? viewPhysicalTable.getTableName().getString() : tableName;
-                    // For local indexes and indexes on views, pass on the the tenant id i.e. all their meta-data rows have
-                    // tenant ids in there.
-                    boolean sharedIndex = tableType == PTableType.INDEX && (indexType == IndexType.LOCAL || parent.getType() == PTableType.VIEW);
-                    String tenantIdToUse = connection.getTenantId() != null && sharedIndex ? connection.getTenantId().getString() : null;
-                    for (Entry<String, Integer> entry : mapToUse.entrySet()) {
-                        String familyName = entry.getKey();
-                        Integer nextQualifier = entry.getValue();
-                        linkStatement.setString(1, tenantIdToUse);
-                        linkStatement.setString(2, schemaNameToUse);
-                        linkStatement.setString(3, tableNameToUse);
-                        linkStatement.setString(4, familyName);
-                        linkStatement.setInt(5, nextQualifier);
-                        linkStatement.execute();
-                    }
+                String schemaNameToUse = tableType == VIEW ? viewPhysicalTable.getSchemaName().getString() : schemaName;
+                String tableNameToUse = tableType == VIEW ? viewPhysicalTable.getTableName().getString() : tableName;
+                // For local indexes and indexes on views, pass on the the tenant id since all their meta-data rows have
+                // tenant ids in there.
+                boolean sharedIndex = tableType == PTableType.INDEX && (indexType == IndexType.LOCAL || parent.getType() == PTableType.VIEW);
+                String tenantIdToUse = connection.getTenantId() != null && sharedIndex ? connection.getTenantId().getString() : null;
+                //TODO: samarth I think we can safely use the default column family here
+                String familyName = QueryConstants.DEFAULT_COLUMN_FAMILY;
+                try (PreparedStatement linkStatement = connection.prepareStatement(UPDATE_ENCODED_COLUMN_COUNTER)) {
+                    linkStatement.setString(1, tenantIdToUse);
+                    linkStatement.setString(2, schemaNameToUse);
+                    linkStatement.setString(3, tableNameToUse);
+                    linkStatement.setString(4, familyName);
+                    linkStatement.setInt(5, cqCounter.getValue());
+                    linkStatement.execute();
+                }
 
-                    // When a view adds its own columns, then we need to increase the sequence number of the base table
-                    // too since we want clients to get the latest PTable of the base table.
-                    if (tableType == VIEW && updatedPhysicalTableCQCounters != null && !updatedPhysicalTableCQCounters.isEmpty()) {
-                        PreparedStatement incrementStatement = connection.prepareStatement(INCREMENT_SEQ_NUM);
-                        incrementStatement.setString(1, null);
-                        incrementStatement.setString(2, viewPhysicalTable.getSchemaName().getString());
-                        incrementStatement.setString(3, viewPhysicalTable.getTableName().getString());
-                        incrementStatement.setLong(4, viewPhysicalTable.getSequenceNumber() + 1);
-                        incrementStatement.execute();
-                    }
+                // When a view adds its own columns, then we need to increase the sequence number of the base table
+                // too since we want clients to get the latest PTable of the base table.
+                if (tableType == VIEW  && cqCounter.getValue() != initialCounterValue) {
+                    PreparedStatement incrementStatement = connection.prepareStatement(INCREMENT_SEQ_NUM);
+                    incrementStatement.setString(1, null);
+                    incrementStatement.setString(2, viewPhysicalTable.getSchemaName().getString());
+                    incrementStatement.setString(3, viewPhysicalTable.getTableName().getString());
+                    incrementStatement.setLong(4, viewPhysicalTable.getSequenceNumber() + 1);
+                    incrementStatement.execute();
                 }
             }
             
@@ -2172,7 +2156,7 @@ public class MetaDataClient {
                         Collections.<PTable>emptyList(), isImmutableRows,
                         Collections.<PName>emptyList(), defaultFamilyName == null ? null :
                                 PNameFactory.newName(defaultFamilyName), null,
-                        Boolean.TRUE.equals(disableWAL), false, false, null, indexId, indexType, true, false, 0, 0L, isNamespaceMapped, StorageScheme.NON_ENCODED_COLUMN_NAMES, ImmutableMap.<String, Integer>of());
+                        Boolean.TRUE.equals(disableWAL), false, false, null, indexId, indexType, true, false, 0, 0L, isNamespaceMapped, StorageScheme.NON_ENCODED_COLUMN_NAMES, PTable.EncodedCQCounter.NULL_COUNTER);
                 connection.addTable(table, MetaDataProtocol.MIN_TABLE_TIMESTAMP);
             } else if (tableType == PTableType.INDEX && indexId == null) {
                 if (tableProps.get(HTableDescriptor.MAX_FILESIZE) == null) {
@@ -2338,14 +2322,18 @@ public class MetaDataClient {
                 throw new ConcurrentTableMutationException(schemaName, tableName);
             default:
                 PName newSchemaName = PNameFactory.newName(schemaName);
-                // Views always rely on the parent table's map to dole out encoded column qualifiers.
-                Map<String, Integer> qualifierMapToBe = tableType == PTableType.VIEW ? ImmutableMap.<String, Integer>of() : nextCQCounters;
+                /*
+                 * It doesn't hurt for the PTable of views to have the cqCounter. However, views always rely on the
+                 * parent table's counter to dole out encoded column qualifiers. So setting the counter as NULL_COUNTER
+                 * for extra safety.
+                 */
+                EncodedCQCounter cqCounterToBe = tableType == PTableType.VIEW ? NULL_COUNTER : cqCounter;
                 PTable table =  PTableImpl.makePTable(
                         tenantId, newSchemaName, PNameFactory.newName(tableName), tableType, indexState, timestamp!=null ? timestamp : result.getMutationTime(),
                         PTable.INITIAL_SEQ_NUM, pkName == null ? null : PNameFactory.newName(pkName), saltBucketNum, columns,
                         dataTableName == null ? null : newSchemaName, dataTableName == null ? null : PNameFactory.newName(dataTableName), Collections.<PTable>emptyList(), isImmutableRows,
                         physicalNames, defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), viewStatement, Boolean.TRUE.equals(disableWAL), multiTenant, storeNulls, viewType,
-                        indexId, indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency, 0L, isNamespaceMapped, storageScheme, qualifierMapToBe);
+                        indexId, indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency, 0L, isNamespaceMapped, storageScheme, cqCounterToBe);
                 result = new MetaDataMutationResult(code, result.getMutationTime(), table, true);
                 addTableToCache(result);
                 return table;
@@ -2355,6 +2343,15 @@ public class MetaDataClient {
         }
     }
 
+    private static boolean incrementEncodedCQCounter(StorageScheme storageScheme, PrimaryKeyConstraint pkConstraint,
+            ColumnDef colDef, ColumnName columnDefName) {
+        return storageScheme == StorageScheme.ENCODED_COLUMN_NAMES && !(colDef.isPK() || (pkConstraint != null && pkConstraint.getColumnWithSortOrder(columnDefName) != null));
+    }
+    
+    private static boolean incrementEncodedCQCounter(StorageScheme storageScheme, ColumnDef colDef) {
+        return storageScheme == StorageScheme.ENCODED_COLUMN_NAMES && !colDef.isPK();
+    }
+
     private byte[][] getSplitKeys(List<HRegionLocation> allTableRegions) {
         if(allTableRegions.size() == 1) return null;
         byte[][] splitKeys = new byte[allTableRegions.size()-1][];
@@ -2906,15 +2903,16 @@ public class MetaDataClient {
                 Set<String> families = new LinkedHashSet<>();
                 PTableType tableType = table.getType();
                 PTable tableForCQCounters = null;
-                Map<String, Integer> cqCountersToUse = null;
-                Map<String, Integer> cfWithUpdatedCQCounters = null; 
+                EncodedCQCounter cqCounterToUse = NULL_COUNTER;
+                StorageScheme storageScheme = table.getStorageScheme();
+                Integer initialCounterValue = null;
                 if (columnDefs.size() > 0 ) {
                     //FIXME: samarth change this to fetch table from server if client cache doesn't have it. What about local indexes?
                     //FIXME: samarth fix this mess of getting table names from connection
                     //TODO: samarth should these be guarded by storage scheme check. Better to have the map always available. immutable empty for views and non encoded.
                     tableForCQCounters = tableType == PTableType.VIEW ? connection.getTable(new PTableKey(null, table.getPhysicalName().getString())) : table;
-                     cqCountersToUse = tableForCQCounters.getEncodedCQCounters();
-                    cfWithUpdatedCQCounters = cqCountersToUse != null ? Maps.<String, Integer>newHashMapWithExpectedSize(columnDefs.size()) : null; 
+                    cqCounterToUse = tableForCQCounters.getEncodedCQCounter();
+                    initialCounterValue = cqCounterToUse.getValue();
                     try (PreparedStatement colUpsert = connection.prepareStatement(INSERT_COLUMN_ALTER_TABLE)) {
                         short nextKeySeq = SchemaUtil.getMaxKeySeq(table);
                         for( ColumnDef colDef : columnDefs) {
@@ -2934,12 +2932,11 @@ public class MetaDataClient {
                                 throw new SQLExceptionInfo.Builder(SQLExceptionCode.ROWTIMESTAMP_CREATE_ONLY)
                                 .setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException();
                             }
-                            PColumn column = newColumn(position++, colDef, PrimaryKeyConstraint.EMPTY, table.getDefaultFamilyName() == null ? null : table.getDefaultFamilyName().getString(), true, cqCountersToUse);
-                            String cf = column.getFamilyName() != null ? column.getFamilyName().getString() : null;
-                            if (cfWithUpdatedCQCounters != null && cf != null && EncodedColumnsUtil.hasEncodedColumnName(column)) {
-                                cfWithUpdatedCQCounters.put(cf, cqCountersToUse.get(cf));
-                            }
+                            PColumn column = newColumn(position++, colDef, PrimaryKeyConstraint.EMPTY, table.getDefaultFamilyName() == null ? null : table.getDefaultFamilyName().getString(), true, cqCounterToUse);
                             columns.add(column);
+                            if (incrementEncodedCQCounter(storageScheme, colDef)) {
+                                cqCounterToUse.increment();
+                            }
                             String pkName = null;
                             Short keySeq = null;
                             
@@ -2976,7 +2973,7 @@ public class MetaDataClient {
                                         ColumnName indexColName = ColumnName.caseSensitiveColumnName(IndexUtil.getIndexColumnName(null, colDef.getColumnDefName().getColumnName()));
                                         Expression expression = new RowKeyColumnExpression(columns.get(i), new RowKeyValueAccessor(pkColumns, ++pkSlotPosition));
                                         ColumnDef indexColDef = FACTORY.columnDef(indexColName, indexColDataType.getSqlTypeName(), colDef.isNull(), colDef.getMaxLength(), colDef.getScale(), true, colDef.getSortOrder(), expression.toString(), colDef.isRowTimestamp());
-                                        PColumn indexColumn = newColumn(indexPosition++, indexColDef, PrimaryKeyConstraint.EMPTY, null, true, cqCountersToUse);
+                                        PColumn indexColumn = newColumn(indexPosition++, indexColDef, PrimaryKeyConstraint.EMPTY, null, true, NULL_COUNTER);
                                         addColumnMutation(schemaName, index.getTableName().getString(), indexColumn, colUpsert, index.getParentTableName().getString(), index.getPKName() == null ? null : index.getPKName().getString(), ++nextIndexKeySeq, index.getBucketNum() != null);
                                     }
                                 }
@@ -3027,21 +3024,21 @@ public class MetaDataClient {
                 boolean sharedIndex = tableType == PTableType.INDEX && (table.getIndexType() == IndexType.LOCAL || table.getViewIndexId() != null);
                 String tenantIdToUse = connection.getTenantId() != null && sharedIndex ? connection.getTenantId().getString() : null;
                 //TODO: samarth I am not sure this is going to work on server side. But for now lets add these mutations here.
-                if (cfWithUpdatedCQCounters != null && !cfWithUpdatedCQCounters.isEmpty()) {
+                if (cqCounterToUse.getValue() != initialCounterValue) {
                     PreparedStatement linkStatement;
-                    if (!sharedIndex) {
-                        linkStatement = connection.prepareStatement(UPDATE_ENCODED_COLUMN_COUNT);
-                        for (Entry<String, Integer> entry : cfWithUpdatedCQCounters.entrySet()) {
-                            String familyName = entry.getKey();
-                            Integer nextQualifier = entry.getValue();
-                            linkStatement.setString(1, tenantIdToUse);
-                            linkStatement.setString(2, tableForCQCounters.getSchemaName().getString());
-                            linkStatement.setString(3, tableForCQCounters.getTableName().getString());
-                            linkStatement.setString(4, familyName);
-                            linkStatement.setInt(5, nextQualifier);
-                            linkStatement.execute();
-                        }
-                    }
+                    //TODO: samarth i don't think we need the shared index check here.
+                    //if (!sharedIndex) {
+                        linkStatement = connection.prepareStatement(UPDATE_ENCODED_COLUMN_COUNTER);
+                        //TODO: samarth should be ok to use the default column family here.    
+                        String familyName = QueryConstants.DEFAULT_COLUMN_FAMILY;
+                        linkStatement.setString(1, tenantIdToUse);
+                        linkStatement.setString(2, tableForCQCounters.getSchemaName().getString());
+                        linkStatement.setString(3, tableForCQCounters.getTableName().getString());
+                        linkStatement.setString(4, familyName);
+                        linkStatement.setInt(5, cqCounterToUse.getValue());
+                        linkStatement.execute();
+
+                    //}
                     // When a view adds its own columns, then we need to increase the sequence number of the base table
                     // too since we want clients to get the latest PTable of the base table.
                     if (tableType == VIEW) {
@@ -3372,11 +3369,13 @@ public class MetaDataClient {
                         Map<String, List<TableRef>> tenantIdTableRefMap = Maps.newHashMap();
                         if (result.getSharedTablesToDelete() != null) {
                             for (SharedTableState sharedTableState : result.getSharedTablesToDelete()) {
+                                //TODO: samarth I don't think we really care about storage scheme and cq counter at this point.
+                                //Probably worthy to change the constructor here to not expect the two arguments.
                                 PTableImpl viewIndexTable = new PTableImpl(sharedTableState.getTenantId(),
                                         sharedTableState.getSchemaName(), sharedTableState.getTableName(), ts,
                                         table.getColumnFamilies(), sharedTableState.getColumns(),
                                         sharedTableState.getPhysicalNames(), sharedTableState.getViewIndexId(),
-                                        table.isMultiTenant(), table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodedCQCounters());
+                                        table.isMultiTenant(), table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodedCQCounter());
                                 TableRef indexTableRef = new TableRef(viewIndexTable);
                                 PName indexTableTenantId = sharedTableState.getTenantId();
                                 if (indexTableTenantId==null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index e42000e..ca911df 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -20,6 +20,8 @@ package org.apache.phoenix.schema;
 import java.util.List;
 import java.util.Map;
 
+import javax.annotation.Nullable;
+
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
@@ -385,5 +387,51 @@ public interface PTable extends PMetaDataEntity {
     long getUpdateCacheFrequency();
     boolean isNamespaceMapped();
     StorageScheme getStorageScheme();
-    Map<String, Integer> getEncodedCQCounters();
+    EncodedCQCounter getEncodedCQCounter();
+    
+    /**
+     * Wrapper around {@link java.lang.Integer} to help track and update counter values.
+     */
+    public class EncodedCQCounter {
+        
+        @Nullable private Integer counter;
+        public static final EncodedCQCounter NULL_COUNTER = new EncodedCQCounter(null); 
+        
+        public EncodedCQCounter(Integer initialValue) {
+            counter = initialValue;
+        }
+        
+        @Nullable
+        public Integer getValue() {
+            return counter;
+        }
+        
+        public void increment() {
+            if (counter != null) {
+                counter++;
+            }
+        }
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + ((counter == null) ? 0 : counter.hashCode());
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) return true;
+            if (obj == null) return false;
+            if (getClass() != obj.getClass()) return false;
+            EncodedCQCounter other = (EncodedCQCounter)obj;
+            if (counter == null) {
+                if (other.counter != null) return false;
+            } else if (!counter.equals(other.counter)) return false;
+            return true;
+        }
+
+
+        
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 897317a..6978fa7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -28,7 +28,6 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
@@ -47,7 +46,6 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.coprocessor.generated.PGuidePostsProtos;
 import org.apache.phoenix.coprocessor.generated.PGuidePostsProtos.PGuidePosts;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
-import org.apache.phoenix.coprocessor.generated.PTableProtos.EncodedColumnQualifierCounter;
 import org.apache.phoenix.exception.DataExceedsCapacityException;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
@@ -146,7 +144,7 @@ public class PTableImpl implements PTable {
     private long updateCacheFrequency;
     private boolean isNamespaceMapped;
     private StorageScheme storageScheme;
-    private Map<String, Integer> encodedCQCounters;
+    private EncodedCQCounter encodedCQCounter;
 
     public PTableImpl() {
         this.indexes = Collections.emptyList();
@@ -179,7 +177,7 @@ public class PTableImpl implements PTable {
     
     // For indexes stored in shared physical tables
     public PTableImpl(PName tenantId, PName schemaName, PName tableName, long timestamp, List<PColumnFamily> families, 
-            List<PColumn> columns, List<PName> physicalNames, Short viewIndexId, boolean multiTenant, boolean isNamespaceMpped, StorageScheme storageScheme, Map<String, Integer> encodedColumnQualifierCounters) throws SQLException {
+            List<PColumn> columns, List<PName> physicalNames, Short viewIndexId, boolean multiTenant, boolean isNamespaceMpped, StorageScheme storageScheme, EncodedCQCounter encodedCQCounter) throws SQLException {
         this.pkColumns = this.allColumns = Collections.emptyList();
         this.rowKeySchema = RowKeySchema.EMPTY_SCHEMA;
         this.indexes = Collections.emptyList();
@@ -193,7 +191,7 @@ public class PTableImpl implements PTable {
         init(tenantId, this.schemaName, this.tableName, PTableType.INDEX, state, timeStamp, sequenceNumber, pkName, bucketNum, columns,
             PTableStats.EMPTY_STATS, this.schemaName, parentTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
             null, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable,
-            isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMpped, storageScheme, encodedColumnQualifierCounters);
+            isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMpped, storageScheme, encodedCQCounter);
     }
 
     public PTableImpl(long timeStamp) { // For delete marker
@@ -236,7 +234,7 @@ public class PTableImpl implements PTable {
                 table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), getColumnsToClone(table), parentSchemaName, table.getParentTableName(),
                 indexes, table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), viewStatement,
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
-                table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodedCQCounters());
+                table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PTable table, List<PColumn> columns) throws SQLException {
@@ -245,7 +243,7 @@ public class PTableImpl implements PTable {
                 table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
                 table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
-                table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodedCQCounters());
+                table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns) throws SQLException {
@@ -254,7 +252,7 @@ public class PTableImpl implements PTable {
                 sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(),
                 table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(),
-                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodedCQCounters());
+                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows) throws SQLException {
@@ -263,7 +261,7 @@ public class PTableImpl implements PTable {
                 sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
                 table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(),
-                table.getIndexType(), table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodedCQCounters());
+                table.getIndexType(), table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodedCQCounter());
     }
     
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled,
@@ -273,7 +271,7 @@ public class PTableImpl implements PTable {
                 sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
                 table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 isWalDisabled, isMultitenant, storeNulls, table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(),
-                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), isTransactional, updateCacheFrequency, table.getIndexDisableTimestamp(), isNamespaceMapped, table.getStorageScheme(), table.getEncodedCQCounters());
+                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), isTransactional, updateCacheFrequency, table.getIndexDisableTimestamp(), isNamespaceMapped, table.getStorageScheme(), table.getEncodedCQCounter());
     }
     
     public static PTableImpl makePTable(PTable table, PIndexState state) throws SQLException {
@@ -283,7 +281,7 @@ public class PTableImpl implements PTable {
                 table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
-                table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodedCQCounters());
+                table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PTable table, boolean rowKeyOrderOptimizable) throws SQLException {
@@ -293,7 +291,7 @@ public class PTableImpl implements PTable {
                 table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(),
-                table.getBaseColumnCount(), rowKeyOrderOptimizable, table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodedCQCounters());
+                table.getBaseColumnCount(), rowKeyOrderOptimizable, table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PTable table, PTableStats stats) throws SQLException {
@@ -303,7 +301,7 @@ public class PTableImpl implements PTable {
                 table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), stats,
-                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodedCQCounters());
+                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodedCQCounter());
     }
 
     public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type,
@@ -312,12 +310,12 @@ public class PTableImpl implements PTable {
             boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression,
             boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
             IndexType indexType, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency,
-            long indexDisableTimestamp, boolean isNamespaceMapped, StorageScheme storageScheme, Map<String, Integer> encodedColumnQualifierCounters) throws SQLException {
+            long indexDisableTimestamp, boolean isNamespaceMapped, StorageScheme storageScheme, EncodedCQCounter encodedCQCounter) throws SQLException {
         return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataSchemaName,
                 dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
                 viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId,
                 indexType, PTableStats.EMPTY_STATS, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, rowKeyOrderOptimizable, isTransactional,
-                updateCacheFrequency,indexDisableTimestamp, isNamespaceMapped, storageScheme, encodedColumnQualifierCounters);
+                updateCacheFrequency,indexDisableTimestamp, isNamespaceMapped, storageScheme, encodedCQCounter);
     }
 
     public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type,
@@ -326,12 +324,12 @@ public class PTableImpl implements PTable {
             boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression,
             boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
             IndexType indexType, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency,
-            @NotNull PTableStats stats, int baseColumnCount, long indexDisableTimestamp, boolean isNamespaceMapped, StorageScheme storageScheme, Map<String, Integer> encodedColumnQualifierCounters)
+            @NotNull PTableStats stats, int baseColumnCount, long indexDisableTimestamp, boolean isNamespaceMapped, StorageScheme storageScheme, EncodedCQCounter encodedCQCounter)
             throws SQLException {
         return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName,
                 bucketNum, columns, dataSchemaName, dataTableName, indexes, isImmutableRows, physicalNames,
                 defaultFamilyName, viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId,
-                indexType, stats, baseColumnCount, rowKeyOrderOptimizable, isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, storageScheme, encodedColumnQualifierCounters);
+                indexType, stats, baseColumnCount, rowKeyOrderOptimizable, isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, storageScheme, encodedCQCounter);
     }
 
     private PTableImpl(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state,
@@ -339,11 +337,11 @@ public class PTableImpl implements PTable {
             PName parentSchemaName, PName parentTableName, List<PTable> indexes, boolean isImmutableRows,
             List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant,
             boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType,
-            PTableStats stats, int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, long indexDisableTimestamp, boolean isNamespaceMapped, StorageScheme storageScheme, Map<String, Integer> encodedColumnQualifierCounters) throws SQLException {
+            PTableStats stats, int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, long indexDisableTimestamp, boolean isNamespaceMapped, StorageScheme storageScheme, EncodedCQCounter encodedCQCounter) throws SQLException {
         init(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns,
                 stats, schemaName, parentTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
                 viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable,
-                isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, storageScheme, encodedColumnQualifierCounters);
+                isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, storageScheme, encodedCQCounter);
     }
     
     @Override
@@ -376,7 +374,7 @@ public class PTableImpl implements PTable {
             PName pkName, Integer bucketNum, List<PColumn> columns, PTableStats stats, PName parentSchemaName, PName parentTableName,
             List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL,
             boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
-            IndexType indexType , int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, long indexDisableTimestamp, boolean isNamespaceMapped, StorageScheme storageScheme, Map<String, Integer> encodedColumnQualifierCounters) throws SQLException {
+            IndexType indexType , int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, long indexDisableTimestamp, boolean isNamespaceMapped, StorageScheme storageScheme, EncodedCQCounter encodedCQCounter) throws SQLException {
         Preconditions.checkNotNull(schemaName);
         Preconditions.checkArgument(tenantId==null || tenantId.getBytes().length > 0); // tenantId should be null or not empty
         int estimatedSize = SizedUtil.OBJECT_SIZE * 2 + 23 * SizedUtil.POINTER_SIZE + 4 * SizedUtil.INT_SIZE + 2 * SizedUtil.LONG_SIZE + 2 * SizedUtil.INT_OBJECT_SIZE +
@@ -551,7 +549,7 @@ public class PTableImpl implements PTable {
         }
         this.estimatedSize = estimatedSize;
         this.baseColumnCount = baseColumnCount;
-        this.encodedCQCounters = encodedColumnQualifierCounters;
+        this.encodedCQCounter = encodedCQCounter;
     }
 
     @Override
@@ -1172,22 +1170,20 @@ public class PTableImpl implements PTable {
       if (table.hasStorageScheme()) {
           storageScheme = StorageScheme.fromSerializedValue(table.getStorageScheme().toByteArray()[0]);
       }
-      int numCounters = table.getEncodedColumnQualifierCountersCount();
-      Map<String, Integer> encodedColumnQualifierCounters = null;
-      if (numCounters > 0) {
-          encodedColumnQualifierCounters = Maps.newHashMapWithExpectedSize(numCounters);
-          for (int i = 0; i < numCounters; i++) {
-              PTableProtos.EncodedColumnQualifierCounter c = table.getEncodedColumnQualifierCounters(i);
-              encodedColumnQualifierCounters.put(c.getFamilyName(), c.getCounter());
-          }
+      EncodedCQCounter encodedColumnQualifierCounter = null;
+      if (table.hasEncodedColumnQualifierCounter()) {
+          encodedColumnQualifierCounter = new EncodedCQCounter(table.getEncodedColumnQualifierCounter());
+      } else {
+          encodedColumnQualifierCounter = PTable.EncodedCQCounter.NULL_COUNTER;
       }
+      
       try {
         PTableImpl result = new PTableImpl();
         result.init(tenantId, schemaName, tableName, tableType, indexState, timeStamp, sequenceNumber, pkName,
             (bucketNum == NO_SALTING) ? null : bucketNum, columns, stats, schemaName,dataTableName, indexes,
             isImmutableRows, physicalNames, defaultFamilyName, viewStatement, disableWAL,
             multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable,
-            isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, storageScheme, encodedColumnQualifierCounters);
+            isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, storageScheme, encodedColumnQualifierCounter);
         return result;
       } catch (SQLException e) {
         throw new RuntimeException(e); // Impossible
@@ -1282,14 +1278,8 @@ public class PTableImpl implements PTable {
       if (table.getStorageScheme() != null) {
           builder.setStorageScheme(ByteStringer.wrap(new byte[]{table.getStorageScheme().getSerializedValue()}));
       }
-      Map<String, Integer> encodedColumnQualifierCounters = table.getEncodedCQCounters();
-      if (encodedColumnQualifierCounters != null) {
-          for (Entry<String, Integer> entry : encodedColumnQualifierCounters.entrySet()) {
-              EncodedColumnQualifierCounter.Builder b = EncodedColumnQualifierCounter.newBuilder();
-              b.setFamilyName(entry.getKey());
-              b.setCounter(entry.getValue());
-              builder.addEncodedColumnQualifierCounters(b.build());
-          }
+      if (table.getEncodedCQCounter() != PTable.EncodedCQCounter.NULL_COUNTER) {
+          builder.setEncodedColumnQualifierCounter(table.getEncodedCQCounter().getValue());
       }
       return builder.build();
     }
@@ -1340,7 +1330,7 @@ public class PTableImpl implements PTable {
     }
     
     @Override
-    public Map<String, Integer> getEncodedCQCounters() {
-        return encodedCQCounters;
+    public EncodedCQCounter getEncodedCQCounter() {
+        return encodedCQCounter;
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BoundedSkipNullCellsList.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BoundedSkipNullCellsList.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BoundedSkipNullCellsList.java
index a04adf7..fa30f54 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BoundedSkipNullCellsList.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BoundedSkipNullCellsList.java
@@ -17,33 +17,60 @@
  */
 package org.apache.phoenix.schema.tuple;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
+import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
+
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
+import java.util.NoSuchElementException;
+
+import javax.annotation.concurrent.NotThreadSafe;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable.StorageScheme;
+import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.types.PInteger;
 
-import com.google.common.base.Preconditions;
-
+/**
+ * List implementation that provides indexed based look up when the cell column qualifiers are generated using the
+ * {@link StorageScheme#ENCODED_COLUMN_NAMES} scheme. The api methods in this list assume that the caller wants to see
+ * and add only non null elements in the list. Such an assumption makes the implementation mimic the behavior that one
+ * would get when passing an {@link ArrayList} to hbase for filling in the key values returned by scanners. This
+ * implementation doesn't implement all the optional methods of the {@link List} interface which should be OK. A lot of
+ * things would be screwed up if HBase starts expecting that the the list implementation passed in to scanners
+ * implements all the optional methods of the interface too.
+ * 
+ * For getting elements out o
+ */
+@NotThreadSafe
 public class BoundedSkipNullCellsList implements List<Cell> {
-    
-    private final int minQualifier;
-    private final int maxQualifier;
+
+    private int minQualifier;
+    private int maxQualifier;
     private final Cell[] array;
     private int numNonNullElements;
     private int firstNonNullElementIdx = -1;
-    
+    private static final String RESERVED_RANGE = "(" + ENCODED_EMPTY_COLUMN_NAME + ", "
+            + (QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE - 1) + ")";
+
     public BoundedSkipNullCellsList(int minQualifier, int maxQualifier) {
-        Preconditions.checkArgument(minQualifier <= maxQualifier);
+        checkArgument(minQualifier <= maxQualifier, "Invalid arguments. Min: " + minQualifier + ". Max: " + maxQualifier);
+        if (!(minQualifier == maxQualifier && minQualifier == ENCODED_EMPTY_COLUMN_NAME)) {
+            checkArgument(minQualifier >= ENCODED_CQ_COUNTER_INITIAL_VALUE, "Argument minQualifier " + minQualifier + " needs to lie outside of the reserved range: " + RESERVED_RANGE);
+        }
         this.minQualifier = minQualifier;
         this.maxQualifier = maxQualifier;
-        this.array = new Cell[maxQualifier - minQualifier + 1];
+        int reservedRangeSize = ENCODED_CQ_COUNTER_INITIAL_VALUE - ENCODED_EMPTY_COLUMN_NAME;
+        this.array = new Cell[reservedRangeSize + maxQualifier - ENCODED_CQ_COUNTER_INITIAL_VALUE + 1];
     }
-    
+
     @Override
     public int size() {
         return numNonNullElements;
@@ -56,21 +83,41 @@ public class BoundedSkipNullCellsList implements List<Cell> {
 
     @Override
     public boolean contains(Object o) {
-        throwUnsupportedOperationException();
-        return false;
+        return indexOf(o) >= 0;
     }
 
-    
+
+    /**
+     * This implementation only returns an array of non-null elements in the list.
+     */
     @Override
     public Object[] toArray() {
-        throwUnsupportedOperationException();
-        return null;
+        Object[] toReturn = new Object[numNonNullElements];
+        int counter = 0;
+        for (int i = 0; i < array.length; i++) {
+            if (array[i] != null) {
+                toReturn[counter++] = array[i];
+            }
+        }
+        return toReturn;
     }
 
+
+    /**
+     * This implementation only returns an array of non-null elements in the list.
+     * This is not the most efficient way of copying elemts into an array 
+     */
     @Override
+    @SuppressWarnings("unchecked")
     public <T> T[] toArray(T[] a) {
-        throwUnsupportedOperationException();
-        return null;
+        T[] toReturn = (T[])java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), numNonNullElements);
+        int counter = 0;
+        for (int i = 0; i < array.length; i++) {
+            if (array[i] != null) {
+                toReturn[counter++] = (T)array[i];
+            }
+        }
+        return toReturn;
     }
 
     @Override
@@ -78,7 +125,7 @@ public class BoundedSkipNullCellsList implements List<Cell> {
         if (e == null) {
             throw new NullPointerException();
         }
-        int columnQualifier = (int)PInteger.INSTANCE.toObject(e.getQualifierArray(), e.getQualifierOffset(), e.getQualifierLength());
+        int columnQualifier = PInteger.INSTANCE.getCodec().decodeInt(e.getQualifierArray(), e.getQualifierOffset(), SortOrder.ASC);
         checkQualifierRange(columnQualifier);
         int idx = getArrayIndex(columnQualifier);
         array[idx] = e;
@@ -92,7 +139,7 @@ public class BoundedSkipNullCellsList implements List<Cell> {
     @Override
     public boolean remove(Object o) {
         if (o == null) {
-            throw new NullPointerException();
+            return false;
         }
         Cell e = (Cell)o;
         int i = 0;
@@ -108,7 +155,7 @@ public class BoundedSkipNullCellsList implements List<Cell> {
                         i++;
                     }
                     if (i < array.length) {
-                        firstNonNullElementIdx = maxQualifier;
+                        firstNonNullElementIdx = i;
                     } else {
                         firstNonNullElementIdx = -1;
                     }
@@ -122,8 +169,12 @@ public class BoundedSkipNullCellsList implements List<Cell> {
 
     @Override
     public boolean containsAll(Collection<?> c) {
-        throwUnsupportedOperationException();
-        return false;
+        boolean containsAll = true;
+        Iterator<?> itr = c.iterator();
+        while (itr.hasNext()) {
+            containsAll &= (indexOf(itr.next()) >= 0);
+        }
+        return containsAll;
     }
 
     @Override
@@ -146,8 +197,12 @@ public class BoundedSkipNullCellsList implements List<Cell> {
 
     @Override
     public boolean removeAll(Collection<?> c) {
-        throwUnsupportedOperationException();
-        return false;
+        Iterator<?> itr = c.iterator();
+        boolean changed = false;
+        while (itr.hasNext()) {
+            changed |= remove(itr.next());
+        }
+        return changed;
     }
 
     @Override
@@ -161,39 +216,31 @@ public class BoundedSkipNullCellsList implements List<Cell> {
         for (int i = 0; i < array.length; i++) {
             array[i] = null;
         }
+        firstNonNullElementIdx = -1;
         numNonNullElements = 0;
     }
-
+    
     @Override
     public Cell get(int index) {
-        //TODO: samarth how can we support this? It is always assumed that the 
-        // user expects to get something back from the list and we would end up returning null
-        // here. Do we just add the 
-        throwUnsupportedOperationException();
         rangeCheck(index);
-        return array[index];
-    }
-    
-    public Cell getCellForColumnQualifier(int columnQualifier) {
-        int idx = getArrayIndex(columnQualifier);
-        return array[idx];
+        int numNonNullElementsFound = 0;
+        int i = 0;
+        for (; i < array.length; i++) {
+            if (array[i] != null) {
+                numNonNullElementsFound++;
+                if (numNonNullElementsFound - 1 == index) {
+                    break;
+                }
+            }
+            
+        }
+        return (numNonNullElementsFound - 1) != index ? null : array[i];
     }
 
     @Override
     public Cell set(int index, Cell element) {
-        //TODO: samarth how can we support this?
         throwUnsupportedOperationException();
-        if (element == null) {
-            throw new NullPointerException();
-        }
-        rangeCheck(index);
-        int idx = minQualifier + index;
-        Cell prev = array[idx];
-        array[idx] = element;
-        if (prev == null) {
-            numNonNullElements++;
-        }
-        return prev;
+        return null;
     }
 
     @Override
@@ -209,14 +256,28 @@ public class BoundedSkipNullCellsList implements List<Cell> {
 
     @Override
     public int indexOf(Object o) {
-        throwUnsupportedOperationException();
-        return 0;
+        if (o == null) {
+            return -1;
+        } else {
+            for (int i = 0; i < array.length; i++)
+                if (o.equals(array[i])) {
+                    return i;
+                }
+        }
+        return -1;
     }
 
     @Override
     public int lastIndexOf(Object o) {
-        throwUnsupportedOperationException();
-        return 0;
+        if (o == null) {
+            return -1;
+        }
+        for (int i = array.length - 1; i >=0 ; i--) {
+            if (o.equals(array[i])) {
+                return i;
+            }
+        }
+        return -1;
     }
 
     @Override
@@ -237,27 +298,51 @@ public class BoundedSkipNullCellsList implements List<Cell> {
         return null;
     }
     
-    private void checkQualifierRange(int qualifier) {
-        if (qualifier < minQualifier || qualifier > maxQualifier) {
-            throw new IndexOutOfBoundsException("Qualifier is out of the range. Min: " + minQualifier + " Max: " + maxQualifier);
+    @Override
+    public Iterator<Cell> iterator() {
+        return new Itr();
+    }
+    
+    public Cell getCellForColumnQualifier(int columnQualifier) {
+        checkQualifierRange(columnQualifier);
+        int idx = getArrayIndex(columnQualifier);
+        Cell c =  array[idx];
+        if (c == null) {
+            throw new NoSuchElementException("No element present for column qualifier: " + columnQualifier);
         }
+        return c;
     }
     
+    public Cell getFirstCell()  {
+        if (firstNonNullElementIdx == -1) {
+            throw new NoSuchElementException("No elements present in the list");
+        }
+        return array[firstNonNullElementIdx];
+    }
+
+    private void checkQualifierRange(int qualifier) {
+        if (qualifier < ENCODED_EMPTY_COLUMN_NAME || qualifier > maxQualifier) { 
+            throw new IndexOutOfBoundsException(
+                "Qualifier " + qualifier + " is out of the valid range. Reserved: " + RESERVED_RANGE + ". Table column qualifier range: ("
+                        + minQualifier + ", " + maxQualifier + ")"); 
+        }
+    }
+
     private void rangeCheck(int index) {
-        if (index < 0 || index >= array.length) {
+        if (index < 0 || index > size() - 1) {
             throw new IndexOutOfBoundsException();
         }
     }
     
-    private void throwUnsupportedOperationException() {
-        throw new UnsupportedOperationException("Operation not supported because Samarth didn't implement it");
+    private int getArrayIndex(int columnQualifier) {
+        return columnQualifier < ENCODED_CQ_COUNTER_INITIAL_VALUE ? columnQualifier : ENCODED_CQ_COUNTER_INITIAL_VALUE
+                + (columnQualifier - minQualifier);
     }
     
-    @Override
-    public Iterator<Cell> iterator() {
-        return new Itr();
+    private void throwUnsupportedOperationException() {
+        throw new UnsupportedOperationException("Operation cannot be supported because it potentially violates the invariance contract of this list implementation");
     }
-    
+
     private class Itr implements Iterator<Cell> {
         private Cell current;
         private int currentIdx = 0;
@@ -265,7 +350,7 @@ public class BoundedSkipNullCellsList implements List<Cell> {
         private Itr() {
             moveToNextNonNullCell(true);
         }
-        
+
         @Override
         public boolean hasNext() {
             return !exhausted;
@@ -285,7 +370,7 @@ public class BoundedSkipNullCellsList implements List<Cell> {
         public void remove() {
             throwUnsupportedOperationException();            
         }
-        
+
         private void moveToNextNonNullCell(boolean init) {
             int i = init ? 0 : currentIdx + 1;
             while (i < array.length && (current = array[i]) == null) {
@@ -298,41 +383,148 @@ public class BoundedSkipNullCellsList implements List<Cell> {
                 exhausted = true;
             }
         }
-        
+
     }
-    
-    public Cell getFirstCell()  {
-        if (firstNonNullElementIdx == -1) {
-            throw new IllegalStateException("List doesn't have any non-null cell present");
+
+    private class ListItr implements ListIterator<Cell> {
+        private int previousIndex;
+        private int nextIndex;
+        private Cell previous;
+        private Cell next;
+        
+        private ListItr() {
+            movePointersForward(true);
+            previous = null;
+            if (nextIndex != -1) {
+                next = array[nextIndex];
+            }
         }
-        return array[firstNonNullElementIdx];
-    }
-    
-    private int getArrayIndex(int columnQualifier) {
-        return columnQualifier - minQualifier;
+        
+        @Override
+        public boolean hasNext() {
+            return next != null;
+        }
+
+        @Override
+        public Cell next() {
+            Cell toReturn = next;
+            if (toReturn == null) {
+                throw new NoSuchElementException();
+            }
+            movePointersForward(false);
+            return toReturn;
+        }
+
+        @Override
+        public boolean hasPrevious() {
+            return previous != null;
+        }
+
+        @Override
+        public Cell previous() {
+            Cell toReturn = previous;
+            if (toReturn == null) {
+                throw new NoSuchElementException();
+            }
+            movePointersBackward(false);
+            return toReturn;
+        }
+
+        @Override
+        public int nextIndex() {
+            return nextIndex;
+        }
+
+        @Override
+        public int previousIndex() {
+            return previousIndex;
+        }
+
+        @Override
+        public void remove() {
+            // TODO Auto-generated method stub
+            
+        }
+        
+        // TODO: samarth this is one of these ouch methods that can make our implementation frgaile.
+        // It is a non-optional method and can't really be supported 
+        @Override
+        public void set(Cell e) {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        public void add(Cell e) {
+            // TODO Auto-generated method stub
+            
+        }
+        
+        private void movePointersForward(boolean init) {
+            int i = init ? 0 : nextIndex;
+            if (!init) {
+                previousIndex = nextIndex;
+                previous = next;
+            } else {
+                previousIndex = -1;
+                previous = null;
+            }
+            while (i < array.length && (array[i]) == null) {
+                i++;
+            }
+            if (i < array.length) {
+                nextIndex = i;
+                next = array[i];
+            } else {
+                nextIndex = -1;
+                next = null;
+            }
+        }
+        
+        private void movePointersBackward(boolean init) {
+            int i = init ? 0 : previousIndex;
+        }
+        
     }
-    
-//    private Cell setCell(int columnQualifier, Cell e) {
-//        
-//    }
-    
+
     public static void main (String args[]) throws Exception {
-        BoundedSkipNullCellsList list = new BoundedSkipNullCellsList(0, 10); // list of eleven elements
+        BoundedSkipNullCellsList list = new BoundedSkipNullCellsList(11, 16); // list of 6 elements
         System.out.println(list.size());
+        
         byte[] row = Bytes.toBytes("row");
         byte[] cf = Bytes.toBytes("cf");
+        
+        // add elements in reserved range
         list.add(KeyValue.createFirstOnRow(row, cf, PInteger.INSTANCE.toBytes(0)));
         list.add(KeyValue.createFirstOnRow(row, cf, PInteger.INSTANCE.toBytes(5)));
         list.add(KeyValue.createFirstOnRow(row, cf, PInteger.INSTANCE.toBytes(10)));
+        System.out.println(list.size());
+        for (Cell c : list) {
+            //System.out.println(c);
+        }
         
+        // add elements in qualifier range
+        list.add(KeyValue.createFirstOnRow(row, cf, PInteger.INSTANCE.toBytes(12)));
+        list.add(KeyValue.createFirstOnRow(row, cf, PInteger.INSTANCE.toBytes(14)));
+        list.add(KeyValue.createFirstOnRow(row, cf, PInteger.INSTANCE.toBytes(16)));
+        System.out.println(list.size());
         for (Cell c : list) {
-            System.out.println(c);
+            //System.out.println(c);
         }
+        
+        list.add(KeyValue.createFirstOnRow(row, cf, PInteger.INSTANCE.toBytes(11)));
         System.out.println(list.size());
+        for (Cell c : list) {
+            //System.out.println(c);
+        }
+        
         System.out.println(list.get(0));
-        System.out.println(list.get(5));
-        System.out.println(list.get(10));
         System.out.println(list.get(1));
+        System.out.println(list.get(2));
+        System.out.println(list.get(3));
+        System.out.println(list.get(4));
+        System.out.println(list.get(5));
+        System.out.println(list.get(6));
         System.out.println(list.remove(KeyValue.createFirstOnRow(row, cf, PInteger.INSTANCE.toBytes(5))));
         System.out.println(list.get(5));
         System.out.println(list.size());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedMultiKeyValueTuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedMultiKeyValueTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedMultiKeyValueTuple.java
index a1fe549..8c41844 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedMultiKeyValueTuple.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/PositionBasedMultiKeyValueTuple.java
@@ -17,10 +17,13 @@
  */
 package org.apache.phoenix.schema.tuple;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.types.PInteger;
 
 /**
@@ -31,19 +34,15 @@ public class PositionBasedMultiKeyValueTuple extends BaseTuple {
 
     public PositionBasedMultiKeyValueTuple() {}
     
-//    public PositionBasedMultiKeyValueTuple(List<Cell> values, int minQualifier, int maxQualifier) {
-//        this.values = new BoundedSkipNullCellsList(minQualifier, maxQualifier);
-//        setKeyValues(values);
-//    }
-    
-//    public PositionBasedMultiKeyValueTuple(int minQualifier, int maxQualifier){
-//        this.values = new BoundedSkipNullCellsList(minQualifier, maxQualifier);
-//    }
+    public PositionBasedMultiKeyValueTuple(List<Cell> values) {
+        checkArgument(values instanceof BoundedSkipNullCellsList, "PositionBasedMultiKeyValueTuple only works with lists of type BoundedSkipNullCellsList");
+        this.values = (BoundedSkipNullCellsList)values;
+    }
     
     /** Caller must not modify the list that is passed here */
     @Override
     public void setKeyValues(List<Cell> values) {
-        assert values instanceof BoundedSkipNullCellsList;
+        checkArgument(values instanceof BoundedSkipNullCellsList, "PositionBasedMultiKeyValueTuple only works with lists of type BoundedSkipNullCellsList");
         this.values = (BoundedSkipNullCellsList)values;
     }
 
@@ -60,7 +59,7 @@ public class PositionBasedMultiKeyValueTuple extends BaseTuple {
 
     @Override
     public Cell getValue(byte[] family, byte[] qualifier) {
-        return values.getCellForColumnQualifier((int)PInteger.INSTANCE.toObject(qualifier));
+        return values.getCellForColumnQualifier(PInteger.INSTANCE.getCodec().decodeInt(qualifier, 0, SortOrder.ASC));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java
index c28a2bf..7f2873a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ResultTuple.java
@@ -17,33 +17,44 @@
  */
 package org.apache.phoenix.schema.tuple;
 
+import static org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE;
+
+import java.util.Collections;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.KeyValueUtil;
 
-
+/**
+ * 
+ * Wrapper around {@link Result} that implements Phoenix's {@link Tuple} interface.
+ *
+ */
 public class ResultTuple extends BaseTuple {
-    private Result result;
+    private final Result result;
+    public static final ResultTuple EMPTY_TUPLE = new ResultTuple(Result.create(Collections.<Cell>emptyList()));
     
+    //TODO: samarth see if we can get rid of this constructor altogether.
     public ResultTuple(Result result) {
         this.result = result;
     }
     
-    public ResultTuple() {
-    }
+//    public ResultTuple(Result result, boolean useQualifierAsIndex) {
+//        this.result = result;
+//        this.useQualifierAsIndex = useQualifierAsIndex;
+//    }
     
     public Result getResult() {
         return this.result;
     }
 
-    public void setResult(Result result) {
-        this.result = result;
-    }
-    
     @Override
     public void getKey(ImmutableBytesWritable ptr) {
         ptr.set(result.getRow());
@@ -56,6 +67,12 @@ public class ResultTuple extends BaseTuple {
 
     @Override
     public KeyValue getValue(byte[] family, byte[] qualifier) {
+//        if (useQualifierAsIndex) {
+//            int index = PInteger.INSTANCE.getCodec().decodeInt(qualifier, 0, SortOrder.ASC);
+//            //TODO: samarth this seems like a hack here at this place. Think more. Maybe we should use a new tuple here?
+//            index = index >= ENCODED_CQ_COUNTER_INITIAL_VALUE ? (index - ENCODED_CQ_COUNTER_INITIAL_VALUE) : index;
+//            return org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(result.rawCells()[index]);
+//        }
         Cell cell = KeyValueUtil.getColumnLatest(GenericKeyValueBuilder.INSTANCE, 
           result.rawCells(), family, qualifier);
         return org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(cell);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 49ac1f4..2df5cd6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -17,6 +17,9 @@
  */
 package org.apache.phoenix.util;
 
+import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_QUALIFIER;
+
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
@@ -521,14 +524,15 @@ public class IndexUtil {
             }
             
             // TODO: handle null case (but shouldn't happen)
+            //TODO: samarth confirm if this is the right thing to do here i.e. pass false for look up.
             Tuple joinTuple = new ResultTuple(joinResult);
             // This will create a byte[] that captures all of the values from the data table
             byte[] value =
                     tupleProjector.getSchema().toBytes(joinTuple, tupleProjector.getExpressions(),
                         tupleProjector.getValueBitSet(), ptr);
             KeyValue keyValue =
-                    KeyValueUtil.newKeyValue(firstCell.getRowArray(),firstCell.getRowOffset(),firstCell.getRowLength(), TupleProjector.VALUE_COLUMN_FAMILY,
-                        TupleProjector.VALUE_COLUMN_QUALIFIER, firstCell.getTimestamp(), value, 0, value.length);
+                    KeyValueUtil.newKeyValue(firstCell.getRowArray(),firstCell.getRowOffset(),firstCell.getRowLength(), VALUE_COLUMN_FAMILY,
+                        VALUE_COLUMN_QUALIFIER, firstCell.getTimestamp(), value, 0, value.length);
             result.add(keyValue);
         }
         for (int i = 0; i < result.size(); i++) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/main/java/org/apache/phoenix/util/ResultUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ResultUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ResultUtil.java
index dba6550..f97230b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ResultUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ResultUtil.java
@@ -18,7 +18,6 @@
 package org.apache.phoenix.util;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 
@@ -129,63 +128,4 @@ public class ResultUtil {
         return Bytes.compareTo(getRawBytes(r1), getKeyOffset(r1), getKeyLength(r1), getRawBytes(r2), getKeyOffset(r2), getKeyLength(r2));
     }
 
-    /**
-     * Binary search for latest column value without allocating memory in the process
-     */
-    public static KeyValue getColumnLatest(Result r, byte[] family, byte[] qualifier) {
-        byte[] rbytes = getRawBytes(r);
-        int roffset = getKeyOffset(r);
-        int rlength = getKeyLength(r);
-        return getColumnLatest(r, rbytes, roffset, rlength, family, 0, family.length, qualifier, 0, qualifier.length);
-    }
-
-    public static KeyValue getSearchTerm(Result r, byte[] family, byte[] qualifier) {
-        byte[] rbytes = getRawBytes(r);
-        int roffset = getKeyOffset(r);
-        int rlength = getKeyLength(r);
-        return KeyValue.createFirstOnRow(rbytes, roffset, rlength, family, 0, family.length, qualifier, 0, qualifier.length);
-    }
-    /**
-     * Binary search for latest column value without allocating memory in the process
-     */
-    public static KeyValue getColumnLatest(Result r, byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength, byte[] qualifier, int qoffset, int qlength) {
-        KeyValue searchTerm = KeyValue.createFirstOnRow(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength);
-        return getColumnLatest(r,searchTerm);
-        
-    }
-
-     /**
-     * Binary search for latest column value without allocating memory in the process
-     * @param r
-     * @param searchTerm
-     */
-    @SuppressWarnings("deprecation")
-    public static KeyValue getColumnLatest(Result r, KeyValue searchTerm) {
-        KeyValue [] kvs = r.raw(); // side effect possibly.
-        if (kvs == null || kvs.length == 0) {
-          return null;
-        }
-        
-        // pos === ( -(insertion point) - 1)
-        int pos = Arrays.binarySearch(kvs, searchTerm, KeyValue.COMPARATOR);
-        // never will exact match
-        if (pos < 0) {
-          pos = (pos+1) * -1;
-          // pos is now insertion point
-        }
-        if (pos == kvs.length) {
-          return null; // doesn't exist
-        }
-
-        KeyValue kv = kvs[pos];
-        if (Bytes.compareTo(kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(),
-                searchTerm.getBuffer(), searchTerm.getFamilyOffset(), searchTerm.getFamilyLength()) != 0) {
-            return null;
-        }
-        if (Bytes.compareTo(kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength(),
-                searchTerm.getBuffer(), searchTerm.getQualifierOffset(), searchTerm.getQualifierLength()) != 0) {
-            return null;
-        }
-        return kv;
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 8174f7b..23df3fd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -59,6 +59,7 @@ import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.ValueSchema.Field;
@@ -825,5 +826,13 @@ public class ScanUtil {
         }
         return new Pair<>(minQ, maxQ);
     }
+    
+    public static boolean useQualifierAsIndex(Pair<Integer, Integer> minMaxQualifiers, boolean isJoin) {
+        return minMaxQualifiers != null && !isJoin;
+    }
+    
+    public static boolean setMinMaxQualifiersOnScan(PTable table) {
+        return EncodedColumnsUtil.usesEncodedColumnNames(table) && !table.isTransactional();
+    }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
index 944dda0..8970469 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.execute;
 
+import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY;
 import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
@@ -55,6 +56,7 @@ import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.JoinTableNode.JoinType;
 import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnImpl;
@@ -247,7 +249,7 @@ public class CorrelatePlanTest {
         for (int i = 0; i < row.length; i++) {
             String name = ParseNodeFactory.createTempAlias();
             Expression expr = LiteralExpression.newConstant(row[i]);
-            columns.add(new PColumnImpl(PNameFactory.newName(name), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY),
+            columns.add(new PColumnImpl(PNameFactory.newName(name), PNameFactory.newName(VALUE_COLUMN_FAMILY),
                     expr.getDataType(), expr.getMaxLength(), expr.getScale(), expr.isNullable(),
                     i, expr.getSortOrder(), null, null, false, name, false, false, null));
         }