You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2016/03/21 21:53:43 UTC

[1/3] phoenix git commit: Replace empty key value with unit tests passing [Forced Update!]

Repository: phoenix
Updated Branches:
  refs/heads/encodecolumns 056694293 -> 42294c4ba (forced update)


http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index bb90c16..b8a3dc0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.schema;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.phoenix.hbase.index.util.KeyValueBuilder.addQuietly;
 import static org.apache.phoenix.hbase.index.util.KeyValueBuilder.deleteQuietly;
 import static org.apache.phoenix.schema.SaltingUtil.SALTING_COLUMN;
@@ -31,6 +32,8 @@ import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
+import co.cask.tephra.TxConstants;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Delete;
@@ -59,6 +62,7 @@ import org.apache.phoenix.schema.types.PChar;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.schema.types.PFloat;
+import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -66,8 +70,6 @@ import org.apache.phoenix.util.SizedUtil;
 import org.apache.phoenix.util.StringUtil;
 import org.apache.phoenix.util.TrustedByteArrayOutputStream;
 
-import co.cask.tephra.TxConstants;
-
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
@@ -108,7 +110,8 @@ public class PTableImpl implements PTable {
     private List<PColumnFamily> families;
     private Map<byte[], PColumnFamily> familyByBytes;
     private Map<String, PColumnFamily> familyByString;
-    private ListMultimap<String,PColumn> columnsByName;
+    private ListMultimap<String, PColumn> columnsByName;
+    private ListMultimap<Integer, PColumn> kvColumnsByColumnQualifiers;
     private PName pkName;
     private Integer bucketNum;
     private RowKeySchema rowKeySchema;
@@ -138,6 +141,7 @@ public class PTableImpl implements PTable {
     private boolean hasColumnsRequiringUpgrade; // TODO: remove when required that tables have been upgrade for PHOENIX-2067
     private int rowTimestampColPos;
     private long updateCacheFrequency;
+    private StorageScheme storageScheme;
 
     public PTableImpl() {
         this.indexes = Collections.emptyList();
@@ -208,7 +212,7 @@ public class PTableImpl implements PTable {
                 table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), getColumnsToClone(table), parentSchemaName, table.getParentTableName(),
                 indexes, table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), viewStatement,
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
-                table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp());
+                table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.getStorageScheme());
     }
 
     public static PTableImpl makePTable(PTable table, List<PColumn> columns) throws SQLException {
@@ -217,7 +221,7 @@ public class PTableImpl implements PTable {
                 table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
                 table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
-                table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp());
+                table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.getStorageScheme());
     }
 
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns) throws SQLException {
@@ -226,7 +230,7 @@ public class PTableImpl implements PTable {
                 sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(),
                 table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(),
-                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp());
+                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.getStorageScheme());
     }
 
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows) throws SQLException {
@@ -235,7 +239,7 @@ public class PTableImpl implements PTable {
                 sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
                 table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(),
-                table.getIndexType(), table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp());
+                table.getIndexType(), table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.getStorageScheme());
     }
     
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled,
@@ -245,7 +249,7 @@ public class PTableImpl implements PTable {
                 sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
                 table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 isWalDisabled, isMultitenant, storeNulls, table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(),
-                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), isTransactional, updateCacheFrequency, table.getIndexDisableTimestamp());
+                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), isTransactional, updateCacheFrequency, table.getIndexDisableTimestamp(), table.getStorageScheme());
     }
     
     public static PTableImpl makePTable(PTable table, PIndexState state) throws SQLException {
@@ -255,7 +259,7 @@ public class PTableImpl implements PTable {
                 table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
-                table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp());
+                table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.getStorageScheme());
     }
 
     public static PTableImpl makePTable(PTable table, boolean rowKeyOrderOptimizable) throws SQLException {
@@ -265,7 +269,7 @@ public class PTableImpl implements PTable {
                 table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(),
-                table.getBaseColumnCount(), rowKeyOrderOptimizable, table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp());
+                table.getBaseColumnCount(), rowKeyOrderOptimizable, table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.getStorageScheme());
     }
 
     public static PTableImpl makePTable(PTable table, PTableStats stats) throws SQLException {
@@ -275,7 +279,7 @@ public class PTableImpl implements PTable {
                 table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), stats,
-                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp());
+                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.getStorageScheme());
     }
 
     public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type,
@@ -284,12 +288,12 @@ public class PTableImpl implements PTable {
             boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression,
             boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
             IndexType indexType, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency,
-            long indexDisableTimestamp) throws SQLException {
+            long indexDisableTimestamp, StorageScheme storageScheme) throws SQLException {
         return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataSchemaName,
                 dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
                 viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId,
                 indexType, PTableStats.EMPTY_STATS, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, rowKeyOrderOptimizable, isTransactional,
-                updateCacheFrequency,indexDisableTimestamp);
+                updateCacheFrequency,indexDisableTimestamp, storageScheme);
     }
 
     public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type,
@@ -298,12 +302,12 @@ public class PTableImpl implements PTable {
             boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression,
             boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
             IndexType indexType, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency,
-            @NotNull PTableStats stats, int baseColumnCount, long indexDisableTimestamp)
+            @NotNull PTableStats stats, int baseColumnCount, long indexDisableTimestamp, StorageScheme storageScheme)
             throws SQLException {
         return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName,
                 bucketNum, columns, dataSchemaName, dataTableName, indexes, isImmutableRows, physicalNames,
                 defaultFamilyName, viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId,
-                indexType, stats, baseColumnCount, rowKeyOrderOptimizable, isTransactional, updateCacheFrequency, indexDisableTimestamp);
+                indexType, stats, baseColumnCount, rowKeyOrderOptimizable, isTransactional, updateCacheFrequency, indexDisableTimestamp, storageScheme);
     }
 
     private PTableImpl(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state,
@@ -311,11 +315,11 @@ public class PTableImpl implements PTable {
             PName parentSchemaName, PName parentTableName, List<PTable> indexes, boolean isImmutableRows,
             List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant,
             boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType,
-            PTableStats stats, int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, long indexDisableTimestamp) throws SQLException {
+            PTableStats stats, int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, long indexDisableTimestamp, StorageScheme storageScheme) throws SQLException {
         init(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns,
                 stats, schemaName, parentTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
                 viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable,
-                isTransactional, updateCacheFrequency, indexDisableTimestamp);
+                isTransactional, updateCacheFrequency, indexDisableTimestamp, storageScheme);
     }
     
     @Override
@@ -348,7 +352,7 @@ public class PTableImpl implements PTable {
             PName pkName, Integer bucketNum, List<PColumn> columns, PTableStats stats, PName parentSchemaName, PName parentTableName,
             List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL,
             boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
-            IndexType indexType , int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, long indexDisableTimestamp) throws SQLException {
+            IndexType indexType , int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, long indexDisableTimestamp, StorageScheme storageScheme) throws SQLException {
         Preconditions.checkNotNull(schemaName);
         Preconditions.checkArgument(tenantId==null || tenantId.getBytes().length > 0); // tenantId should be null or not empty
         int estimatedSize = SizedUtil.OBJECT_SIZE * 2 + 23 * SizedUtil.POINTER_SIZE + 4 * SizedUtil.INT_SIZE + 2 * SizedUtil.LONG_SIZE + 2 * SizedUtil.INT_OBJECT_SIZE +
@@ -382,10 +386,12 @@ public class PTableImpl implements PTable {
         this.tableStats = stats;
         this.rowKeyOrderOptimizable = rowKeyOrderOptimizable;
         this.updateCacheFrequency = updateCacheFrequency;
+        this.storageScheme = storageScheme;
         List<PColumn> pkColumns;
         PColumn[] allColumns;
         
         this.columnsByName = ArrayListMultimap.create(columns.size(), 1);
+        this.kvColumnsByColumnQualifiers = (storageScheme == StorageScheme.ENCODED_COLUMN_NAMES ? ArrayListMultimap.<Integer, PColumn>create(columns.size(), 1) : null);
         int numPKColumns = 0;
         if (bucketNum != null) {
             // Add salt column to allColumns and pkColumns, but don't add to
@@ -412,7 +418,22 @@ public class PTableImpl implements PTable {
                     if (Objects.equal(familyName, dupColumn.getFamilyName())) {
                         count++;
                         if (count > 1) {
-                            throw new ColumnAlreadyExistsException(null, name.getString(), columnName);
+                            throw new ColumnAlreadyExistsException(schemaName.getString(), name.getString(), columnName);
+                        }
+                    }
+                }
+            }
+            Integer cq = column.getColumnQualifier();
+            //TODO: samarth understand the implication of this.
+            if (kvColumnsByColumnQualifiers != null && cq != null) {
+                if (kvColumnsByColumnQualifiers.put(cq, column)) {
+                    int count = 0;
+                    for (PColumn dupColumn : kvColumnsByColumnQualifiers.get(cq)) {
+                        if (Objects.equal(familyName, dupColumn.getFamilyName())) {
+                            count++;
+                            if (count > 1) {
+                                throw new ColumnAlreadyExistsException(schemaName.getString(), name.getString(), columnName);
+                            }
                         }
                     }
                 }
@@ -477,7 +498,7 @@ public class PTableImpl implements PTable {
                 .orderedBy(Bytes.BYTES_COMPARATOR);
         for (int i = 0; i < families.length; i++) {
             Map.Entry<PName,List<PColumn>> entry = iterator.next();
-            PColumnFamily family = new PColumnFamilyImpl(entry.getKey(), entry.getValue());
+            PColumnFamily family = new PColumnFamilyImpl(entry.getKey(), entry.getValue(), storageScheme == StorageScheme.ENCODED_COLUMN_NAMES);
             families[i] = family;
             familyByString.put(family.getName().getString(), family);
             familyByBytes.put(family.getName().getBytes(), family);
@@ -663,7 +684,7 @@ public class PTableImpl implements PTable {
     }
 
     @Override
-    public PColumn getColumn(String name) throws ColumnNotFoundException, AmbiguousColumnException {
+    public PColumn getPColumnForColumnName(String name) throws ColumnNotFoundException, AmbiguousColumnException {
         List<PColumn> columns = columnsByName.get(name);
         int size = columns.size();
         if (size == 0) {
@@ -682,6 +703,35 @@ public class PTableImpl implements PTable {
         }
         return columns.get(0);
     }
+    
+    @Override
+    public PColumn getPColumnForColumnQualifier(byte[] cq) throws ColumnNotFoundException, AmbiguousColumnException {
+        if (SchemaUtil.usesEncodedColumnNames(this)) {
+            String columnName = (String)PVarchar.INSTANCE.toObject(cq);
+            return getPColumnForColumnName(columnName);
+        } else {
+            Integer qualifier = (Integer)PInteger.INSTANCE.toObject(cq);
+            List<PColumn> columns = kvColumnsByColumnQualifiers.get(qualifier);
+            int size = columns.size();
+            if (size == 0) {
+                //TODO: samarth should we have a column qualifier not found exception?
+                throw new ColumnNotFoundException(Bytes.toString(cq));
+            }
+            if (size > 1) {
+                for (PColumn column : columns) {
+                    if (column.getFamilyName() == null || QueryConstants.DEFAULT_COLUMN_FAMILY.equals(column.getFamilyName().getString())) {
+                        // Allow ambiguity with PK column or column in the default column family,
+                        // since a PK column cannot be prefixed and a user would not know how to
+                        // prefix a column in the default column family.
+                        return column;
+                    }
+                }
+                //TODO: samarth should we have a column qualifier not found exception?
+                throw new AmbiguousColumnException(columns.get(0).getName().getString());
+            }
+            return columns.get(0);
+        }
+    }
 
     /**
      *
@@ -774,7 +824,8 @@ public class PTableImpl implements PTable {
         public void setValue(PColumn column, byte[] byteValue) {
             deleteRow = null;
             byte[] family = column.getFamilyName().getBytes();
-            byte[] qualifier = column.getName().getBytes();
+            byte[] qualifier = getColumnQualifier(column);
+            ImmutableBytesPtr qualifierPtr = getColumnQualifierPtr(column);
             PDataType type = column.getDataType();
             // Check null, since some types have no byte representation for null
             boolean isNull = type.isNull(byteValue);
@@ -784,7 +835,7 @@ public class PTableImpl implements PTable {
                 }
                 removeIfPresent(setValues, family, qualifier);
                 deleteQuietly(unsetValues, kvBuilder, kvBuilder.buildDeleteColumns(keyPtr, column
-                            .getFamilyName().getBytesPtr(), column.getName().getBytesPtr(), ts));
+                            .getFamilyName().getBytesPtr(), qualifierPtr, ts));
             } else {
                 ImmutableBytesWritable ptr = new ImmutableBytesWritable(byteValue == null ?
                         HConstants.EMPTY_BYTE_ARRAY : byteValue);
@@ -798,7 +849,7 @@ public class PTableImpl implements PTable {
             	}
                 removeIfPresent(unsetValues, family, qualifier);
                 addQuietly(setValues, kvBuilder, kvBuilder.buildPut(keyPtr,
-                        column.getFamilyName().getBytesPtr(), column.getName().getBytesPtr(),
+                        column.getFamilyName().getBytesPtr(), qualifierPtr,
                         ts, ptr));
             }
         }
@@ -827,6 +878,26 @@ public class PTableImpl implements PTable {
                 deleteRow.setDurability(Durability.SKIP_WAL);
             }
         }
+        
+        private byte[] getColumnQualifier(PColumn column) {
+            //return column.getName().getBytes();
+            checkArgument(!SchemaUtil.isPKColumn(column), "No column qualifiers for PK columns");
+            boolean tableUsesEncodedColumnNames = SchemaUtil.usesEncodedColumnNames(PTableImpl.this);
+            if (!tableUsesEncodedColumnNames) {
+                return column.getName().getBytes();
+            }
+            return PInteger.INSTANCE.toBytes(column.getColumnQualifier());
+        }
+        
+        private ImmutableBytesPtr getColumnQualifierPtr(PColumn column) {
+//            return column.getName().getBytesPtr();
+            checkArgument(!SchemaUtil.isPKColumn(column), "No column qualifiers for PK columns");
+            boolean tableUsesEncodedColumnNames = SchemaUtil.usesEncodedColumnNames(PTableImpl.this);
+            if (!tableUsesEncodedColumnNames) {
+                return column.getName().getBytesPtr();
+            }
+            return new ImmutableBytesPtr(PInteger.INSTANCE.toBytes(column.getColumnQualifier()));
+        }
     }
 
     @Override
@@ -1079,14 +1150,17 @@ public class PTableImpl implements PTable {
       if (table.hasUpdateCacheFrequency()) {
           updateCacheFrequency = table.getUpdateCacheFrequency();
       }
-      
+      StorageScheme storageScheme = null;
+      if (table.hasStorageScheme()) {
+          storageScheme = StorageScheme.fromSerializedValue(table.getStorageScheme().toByteArray()[0]);
+      }
       try {
         PTableImpl result = new PTableImpl();
         result.init(tenantId, schemaName, tableName, tableType, indexState, timeStamp, sequenceNumber, pkName,
             (bucketNum == NO_SALTING) ? null : bucketNum, columns, stats, schemaName,dataTableName, indexes,
             isImmutableRows, physicalNames, defaultFamilyName, viewStatement, disableWAL,
             multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable,
-            isTransactional, updateCacheFrequency, indexDisableTimestamp);
+            isTransactional, updateCacheFrequency, indexDisableTimestamp, storageScheme);
         return result;
       } catch (SQLException e) {
         throw new RuntimeException(e); // Impossible
@@ -1177,7 +1251,9 @@ public class PTableImpl implements PTable {
       builder.setRowKeyOrderOptimizable(table.rowKeyOrderOptimizable());
       builder.setUpdateCacheFrequency(table.getUpdateCacheFrequency());
       builder.setIndexDisableTimestamp(table.getIndexDisableTimestamp());
-      
+      if (table.getStorageScheme() != null) {
+          builder.setStorageScheme(ByteStringer.wrap(new byte[]{table.getStorageScheme().getSerializedValue()}));
+      }
       return builder.build();
     }
 
@@ -1201,6 +1277,7 @@ public class PTableImpl implements PTable {
         return isTransactional;
     }
 
+    @Override
     public int getBaseColumnCount() {
         return baseColumnCount;
     }
@@ -1214,4 +1291,9 @@ public class PTableImpl implements PTable {
     public int getRowTimestampColPos() {
         return rowTimestampColPos;
     }
+    
+    @Override
+    public StorageScheme getStorageScheme() {
+        return storageScheme;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/schema/ProjectedColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ProjectedColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ProjectedColumn.java
index 19dd1c1..9336938 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ProjectedColumn.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ProjectedColumn.java
@@ -39,6 +39,7 @@ public class ProjectedColumn extends DelegateColumn {
         return name;
     }
     
+    @Override
     public PName getFamilyName() {
         return familyName;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
index 734a9ed..23cfd1b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java
@@ -38,7 +38,7 @@ public class SaltingUtil {
     public static final String SALTING_COLUMN_NAME = "_SALT";
     public static final String SALTED_ROW_KEY_NAME = "_SALTED_KEY";
     public static final PColumnImpl SALTING_COLUMN = new PColumnImpl(
-            PNameFactory.newName(SALTING_COLUMN_NAME), null, PBinary.INSTANCE, 1, 0, false, 0, SortOrder.getDefault(), 0, null, false, null, false, false);
+            PNameFactory.newName(SALTING_COLUMN_NAME), null, PBinary.INSTANCE, 1, 0, false, 0, SortOrder.getDefault(), 0, null, false, null, false, false, null);
     public static final RowKeySchema VAR_BINARY_SALTED_SCHEMA = new RowKeySchemaBuilder(2)
         .addField(SALTING_COLUMN, false, SortOrder.getDefault())
         .addField(SchemaUtil.VAR_BINARY_DATUM, false, SortOrder.getDefault()).build();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 45d284d..587473f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -176,7 +176,7 @@ public class IndexUtil {
             throw new IllegalArgumentException("Could not find column family \"" +  indexColumnName.substring(0, pos) + "\" in index column name of \"" + indexColumnName + "\"", e);
         }
         try {
-            return family.getColumn(indexColumnName.substring(pos+1));
+            return family.getPColumnForColumnName(indexColumnName.substring(pos+1));
         } catch (ColumnNotFoundException e) {
             throw new IllegalArgumentException("Could not find column \"" +  indexColumnName.substring(pos+1) + "\" in index column name of \"" + indexColumnName + "\"", e);
         }
@@ -203,10 +203,11 @@ public class IndexUtil {
 
     private static boolean isEmptyKeyValue(PTable table, ColumnReference ref) {
         byte[] emptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(table);
+        byte[] emptyKeyValueQualifier = SchemaUtil.getEmptyKeyValueInfo(table).getFirst();
         return (Bytes.compareTo(emptyKeyValueCF, 0, emptyKeyValueCF.length, ref.getFamilyWritable()
                 .get(), ref.getFamilyWritable().getOffset(), ref.getFamilyWritable().getLength()) == 0 && Bytes
-                .compareTo(QueryConstants.EMPTY_COLUMN_BYTES, 0,
-                    QueryConstants.EMPTY_COLUMN_BYTES.length, ref.getQualifierWritable().get(), ref
+                .compareTo(emptyKeyValueQualifier, 0,
+                        emptyKeyValueQualifier.length, ref.getQualifierWritable().get(), ref
                             .getQualifierWritable().getOffset(), ref.getQualifierWritable()
                             .getLength()) == 0);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index 508446c..021a57c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -475,9 +475,9 @@ public class PhoenixRuntime {
             String familyName = tokens[0];
             String familyColumn = tokens[1];
             PColumnFamily family = table.getColumnFamily(familyName);
-            pColumn = family.getColumn(familyColumn);
+            pColumn = family.getPColumnForColumnName(familyColumn);
         } else {
-            pColumn = table.getColumn(columnName);
+            pColumn = table.getPColumnForColumnName(columnName);
         }
         return getColumnInfo(pColumn);
     }
@@ -962,9 +962,9 @@ public class PhoenixRuntime {
         PColumn pColumn = null;
         if (familyName != null) {
             PColumnFamily family = table.getColumnFamily(familyName);
-            pColumn = family.getColumn(columnName);
+            pColumn = family.getPColumnForColumnName(columnName);
         } else {
-            pColumn = table.getColumn(columnName);
+            pColumn = table.getPColumnForColumnName(columnName);
         }
         return pColumn;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 523f684..b6c5f72 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -44,6 +44,7 @@ import javax.annotation.Nullable;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.expression.Expression;
@@ -60,6 +61,7 @@ import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PDatum;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.StorageScheme;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.RowKeySchema.RowKeySchemaBuilder;
 import org.apache.phoenix.schema.SaltingUtil;
@@ -67,6 +69,7 @@ import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableProperty;
 import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
 
@@ -143,8 +146,9 @@ public class SchemaUtil {
     			rowSize += KeyValue.getKeyValueDataStructureSize(keyLength, column.getFamilyName().getBytes().length, column.getName().getBytes().length, valueLength);
     		}
     	}
+    	byte[] emptyKeyValueKV = SchemaUtil.getEmptyKeyValueInfo(table).getFirst();
     	// Empty key value
-    	rowSize += KeyValue.getKeyValueDataStructureSize(keyLength, getEmptyColumnFamily(table).length, QueryConstants.EMPTY_COLUMN_BYTES.length, 0);
+    	rowSize += KeyValue.getKeyValueDataStructureSize(keyLength, getEmptyColumnFamily(table).length, emptyKeyValueKV.length, 0);
     	return rowSize;
     }
     
@@ -364,7 +368,7 @@ public class SchemaUtil {
                 }
             } else {
                 try {
-                    return table.getColumnFamily(familyName.getString()).getColumn(column.getName().getString()).getName().getString();
+                    return table.getColumnFamily(familyName.getString()).getPColumnForColumnName(column.getName().getString()).getName().getString();
                 } catch (ColumnFamilyNotFoundException e) {
                     continue; // Shouldn't happen
                 } catch (ColumnNotFoundException e) {
@@ -556,7 +560,7 @@ public class SchemaUtil {
     
     public static boolean columnExists(PTable table, String columnName) {
         try {
-            table.getColumn(columnName);
+            table.getPColumnForColumnName(columnName);
             return true;
         } catch (ColumnNotFoundException e) {
             return false;
@@ -897,4 +901,53 @@ public class SchemaUtil {
         PName schemaName = dataTable.getSchemaName();
         return getTableKey(tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes(), schemaName == null ? ByteUtil.EMPTY_BYTE_ARRAY : schemaName.getBytes(), dataTable.getTableName().getBytes());
     }
+    
+    
+    /**
+     * Return a map of column family -> next column qualifier number to use.
+     */
+    public static Map<String, Integer> getNextColumnQualifiers(PTable table) {
+        Map<String, Integer> map = Maps.newHashMapWithExpectedSize(table.getColumns().size());
+        for (PColumnFamily f : table.getColumnFamilies()) {
+            final int size = f.getColumns().size();
+            // column qualifiers start with 1.
+            map.put(f.getName().getString(), size + 1);
+        }
+        return map;
+    }
+    
+    public static boolean usesEncodedColumnNames(PTable table) {
+        return table.getStorageScheme() != null && table.getStorageScheme() == StorageScheme.ENCODED_COLUMN_NAMES;
+    }
+    
+    public static byte[] getColumnQualifier(PColumn column, PTable table) {
+      checkArgument(!SchemaUtil.isPKColumn(column), "No column qualifiers for PK columns");
+      return usesEncodedColumnNames(table) ? PInteger.INSTANCE.toBytes(column.getColumnQualifier()) : column.getName().getBytes();
+    }
+    
+    public static byte[] getColumnQualifier(PColumn column, boolean encodedColumnName) {
+        checkArgument(!SchemaUtil.isPKColumn(column), "No column qualifiers for PK columns");
+        return encodedColumnName ? PInteger.INSTANCE.toBytes(column.getColumnQualifier()) : column.getName().getBytes(); 
+    }
+    
+    /**
+     * @return pair of byte arrays. The first part of the pair is the empty key value's column qualifier, and the second
+     *         part is the value to use for it.
+     */
+    public static Pair<byte[], byte[]> getEmptyKeyValueInfo(PTable table) {
+        return usesEncodedColumnNames(table) ? new Pair<>(QueryConstants.ENCODED_EMPTY_COLUMN_BYTES,
+                QueryConstants.ENCODED_EMPTY_COLUMN_VALUE_BYTES) : new Pair<>(QueryConstants.EMPTY_COLUMN_BYTES,
+                QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+    }
+    
+    /**
+     * @return pair of byte arrays. The first part of the pair is the empty key value's column qualifier, and the second
+     *         part is the value to use for it.
+     */
+    public static Pair<byte[], byte[]> getEmptyKeyValueInfo(boolean usesEncodedColumnNames) {
+        return usesEncodedColumnNames ? new Pair<>(QueryConstants.ENCODED_EMPTY_COLUMN_BYTES,
+                QueryConstants.ENCODED_EMPTY_COLUMN_VALUE_BYTES) : new Pair<>(QueryConstants.EMPTY_COLUMN_BYTES,
+                QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/test/java/org/apache/phoenix/compile/HavingCompilerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/HavingCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/HavingCompilerTest.java
index 1c7477d..ae2bd14 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/HavingCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/HavingCompilerTest.java
@@ -156,7 +156,7 @@ public class HavingCompilerTest extends BaseConnectionlessQueryTest {
         String query = "select count(1) from atable group by a_string having count(1) >= 1 or a_string = 'foo'";
         List<Object> binds = Collections.emptyList();
         Expressions expressions = compileStatement(query,binds);
-        PColumn aCol = ATABLE.getColumn("A_STRING");
+        PColumn aCol = ATABLE.getPColumnForColumnName("A_STRING");
         Expression h = or(
                 constantComparison(CompareOp.GREATER_OR_EQUAL, new CountAggregateFunction(),1L),
                 constantComparison(CompareOp.EQUAL, 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index ce38cfd..11a7b56 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -163,7 +163,7 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
             String query = "CREATE TABLE t1 (k integer not null primary key, a.k decimal, b.k decimal)";
             conn.createStatement().execute(query);
             PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-            PColumn c = pconn.getTable(new PTableKey(pconn.getTenantId(), "T1")).getColumn("K");
+            PColumn c = pconn.getTable(new PTableKey(pconn.getTenantId(), "T1")).getPColumnForColumnName("K");
             assertTrue(SchemaUtil.isPKColumn(c));
         } finally {
             conn.close();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
index 2f1a369..d7d7d02 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
@@ -67,8 +67,10 @@ import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.RowKeyValueAccessor;
 import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.types.PChar;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.types.PVarchar;
@@ -118,9 +120,9 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest {
         QueryPlan plan = pstmt.optimizeQuery();
         Scan scan = plan.getContext().getScan();
         Filter filter = scan.getFilter();
-        ColumnExpression idExpression = new ColumnRef(plan.getTableRef(), plan.getTableRef().getTable().getColumn("ID").getPosition()).newColumnExpression();
+        ColumnExpression idExpression = new ColumnRef(plan.getTableRef(), plan.getTableRef().getTable().getPColumnForColumnName("ID").getPosition()).newColumnExpression();
         Expression id = new RowKeyColumnExpression(idExpression,new RowKeyValueAccessor(plan.getTableRef().getTable().getPKColumns(),0));
-        Expression company = new KeyValueColumnExpression(plan.getTableRef().getTable().getColumn("COMPANY"));
+        Expression company = new KeyValueColumnExpression(plan.getTableRef().getTable().getPColumnForColumnName("COMPANY"), true);
         // FilterList has no equals implementation
         assertTrue(filter instanceof FilterList);
         FilterList filterList = (FilterList)filter;
@@ -148,11 +150,11 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest {
         QueryPlan plan = pstmt.optimizeQuery();
         Scan scan = plan.getContext().getScan();
         Filter filter = scan.getFilter();
-        PColumn column = plan.getTableRef().getTable().getColumn("COMPANY");
+        PColumn column = plan.getTableRef().getTable().getPColumnForColumnName("COMPANY");
         assertEquals(
                 singleKVFilter(constantComparison(
                     CompareOp.EQUAL,
-                    new KeyValueColumnExpression(column),
+                    new KeyValueColumnExpression(column, true),
                     "c3")),
                 filter);
     }
@@ -939,16 +941,18 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest {
         QueryPlan plan = pstmt.optimizeQuery();
         Scan scan = plan.getContext().getScan();
         Filter filter = scan.getFilter();
-
+        PTable table = plan.getTableRef().getTable();
+        ColumnExpression aInteger = new ColumnRef(new TableRef(table), table.getPColumnForColumnName("A_INTEGER").getPosition()).newColumnExpression();
+        ColumnExpression aString = new ColumnRef(new TableRef(table), table.getPColumnForColumnName("A_STRING").getPosition()).newColumnExpression();
         assertEquals(
             multiKVFilter(and(
                 constantComparison(
                     CompareOp.EQUAL,
-                    A_INTEGER,
+                    aInteger,
                     0),
                 constantComparison(
                     CompareOp.EQUAL,
-                    A_STRING,
+                    aString,
                     "foo"))),
             filter);
         
@@ -971,16 +975,18 @@ public class WhereCompilerTest extends BaseConnectionlessQueryTest {
         QueryPlan plan = pstmt.optimizeQuery();
         Scan scan = plan.getContext().getScan();
         Filter filter = scan.getFilter();
-
+        PTable table = plan.getTableRef().getTable();
+        ColumnExpression aInteger = new ColumnRef(new TableRef(table), table.getPColumnForColumnName("A_INTEGER").getPosition()).newColumnExpression();
+        ColumnExpression aString = new ColumnRef(new TableRef(table), table.getPColumnForColumnName("A_STRING").getPosition()).newColumnExpression();
         assertEquals(
             multiKVFilter(and(
                 constantComparison(
                     CompareOp.EQUAL,
-                    A_INTEGER,
+                    aInteger,
                     0),
                 constantComparison(
                     CompareOp.EQUAL,
-                    A_STRING,
+                    aString,
                     "foo"))),
             filter);
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
index a8757ab..6b4cd9b 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
@@ -226,14 +226,14 @@ public class CorrelatePlanTest {
             Expression expr = LiteralExpression.newConstant(row[i]);
             columns.add(new PColumnImpl(PNameFactory.newName(name), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY),
                     expr.getDataType(), expr.getMaxLength(), expr.getScale(), expr.isNullable(),
-                    i, expr.getSortOrder(), null, null, false, name, false, false));
+                    i, expr.getSortOrder(), null, null, false, name, false, false, null));
         }
         try {
             PTable pTable = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME,
                     PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM,
                     null, null, columns, null, null, Collections.<PTable>emptyList(),
                     false, Collections.<PName>emptyList(), null, null, false, false, false, null,
-                    null, null, true, false, 0, 0L);
+                    null, null, true, false, 0, 0L, null);
             TableRef sourceTable = new TableRef(pTable);
             List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList();
             for (PColumn column : sourceTable.getTable().getColumns()) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
index 8b2b096..93203a5 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
@@ -116,8 +116,8 @@ public class UnnestArrayPlanTest {
         LiteralExpression dummy = LiteralExpression.newConstant(null, arrayType);
         RowKeyValueAccessor accessor = new RowKeyValueAccessor(Arrays.asList(dummy), 0);
         UnnestArrayPlan plan = new UnnestArrayPlan(subPlan, new RowKeyColumnExpression(dummy, accessor), withOrdinality);
-        PColumn elemColumn = new PColumnImpl(PNameFactory.newName("ELEM"), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), baseType, null, null, true, 0, SortOrder.getDefault(), null, null, false, "", false, false);
-        PColumn indexColumn = withOrdinality ? new PColumnImpl(PNameFactory.newName("IDX"), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), PInteger.INSTANCE, null, null, true, 0, SortOrder.getDefault(), null, null, false, "", false, false) : null;
+        PColumn elemColumn = new PColumnImpl(PNameFactory.newName("ELEM"), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), baseType, null, null, true, 0, SortOrder.getDefault(), null, null, false, "", false, false, null);
+        PColumn indexColumn = withOrdinality ? new PColumnImpl(PNameFactory.newName("IDX"), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), PInteger.INSTANCE, null, null, true, 0, SortOrder.getDefault(), null, null, false, "", false, false, null) : null;
         List<PColumn> columns = withOrdinality ? Arrays.asList(elemColumn, indexColumn) : Arrays.asList(elemColumn);
         ProjectedColumnExpression elemExpr = new ProjectedColumnExpression(elemColumn, columns, 0, elemColumn.getName().getString());
         ProjectedColumnExpression indexExpr = withOrdinality ? new ProjectedColumnExpression(indexColumn, columns, 1, indexColumn.getName().getString()) : null;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/test/java/org/apache/phoenix/expression/ColumnExpressionTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/expression/ColumnExpressionTest.java b/phoenix-core/src/test/java/org/apache/phoenix/expression/ColumnExpressionTest.java
index 7ee579c..98c2495 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/expression/ColumnExpressionTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/expression/ColumnExpressionTest.java
@@ -41,8 +41,8 @@ public class ColumnExpressionTest {
         int maxLen = 30;
         int scale = 5;
         PColumn column = new PColumnImpl(PNameFactory.newName("c1"), PNameFactory.newName("f1"), PDecimal.INSTANCE, maxLen, scale,
-                true, 20, SortOrder.getDefault(), 0, null, false, null, false, false);
-        ColumnExpression colExp = new KeyValueColumnExpression(column);
+                true, 20, SortOrder.getDefault(), 0, null, false, null, false, false, 0);
+        ColumnExpression colExp = new KeyValueColumnExpression(column, true);
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutputStream dOut = new DataOutputStream(baos);
         colExp.write(dOut);
@@ -61,8 +61,8 @@ public class ColumnExpressionTest {
     public void testSerializationWithNullScale() throws Exception {
         int maxLen = 30;
         PColumn column = new PColumnImpl(PNameFactory.newName("c1"), PNameFactory.newName("f1"), PBinary.INSTANCE, maxLen, null,
-                true, 20, SortOrder.getDefault(), 0, null, false, null, false, false);
-        ColumnExpression colExp = new KeyValueColumnExpression(column);
+                true, 20, SortOrder.getDefault(), 0, null, false, null, false, false, 0);
+        ColumnExpression colExp = new KeyValueColumnExpression(column, true);
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutputStream dOut = new DataOutputStream(baos);
         colExp.write(dOut);
@@ -81,8 +81,8 @@ public class ColumnExpressionTest {
     public void testSerializationWithNullMaxLength() throws Exception {
         int scale = 5;
         PColumn column = new PColumnImpl(PNameFactory.newName("c1"), PNameFactory.newName("f1"), PVarchar.INSTANCE, null, scale,
-                true, 20, SortOrder.getDefault(), 0, null, false, null, false, false);
-        ColumnExpression colExp = new KeyValueColumnExpression(column);
+                true, 20, SortOrder.getDefault(), 0, null, false, null, false, false, 0);
+        ColumnExpression colExp = new KeyValueColumnExpression(column, true);
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutputStream dOut = new DataOutputStream(baos);
         colExp.write(dOut);
@@ -100,8 +100,8 @@ public class ColumnExpressionTest {
     @Test
     public void testSerializationWithNullScaleAndMaxLength() throws Exception {
         PColumn column = new PColumnImpl(PNameFactory.newName("c1"), PNameFactory.newName("f1"), PDecimal.INSTANCE, null, null, true,
-                20, SortOrder.getDefault(), 0, null, false, null, false, false);
-        ColumnExpression colExp = new KeyValueColumnExpression(column);
+                20, SortOrder.getDefault(), 0, null, false, null, false, false, 0);
+        ColumnExpression colExp = new KeyValueColumnExpression(column, true);
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutputStream dOut = new DataOutputStream(baos);
         colExp.write(dOut);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
index 435f0fe..e7e03d6 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
@@ -135,8 +135,7 @@ public class IndexMaintainerTest  extends BaseConnectionlessQueryTest {
             }
             ValueGetter valueGetter = newValueGetter(row, valueMap);
             
-            List<Mutation> indexMutations =
-                    IndexTestUtil.generateIndexData(index, table, dataMutation, ptr, builder);
+            List<Mutation> indexMutations = IndexTestUtil.generateIndexData(index, table, dataMutation, ptr, builder);
             assertEquals(1,indexMutations.size());
             assertTrue(indexMutations.get(0) instanceof Put);
             Mutation indexMutation = indexMutations.get(0);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
index f53e871..cb9c640 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/AggregateResultScannerTest.java
@@ -137,7 +137,11 @@ public class AggregateResultScannerTest extends BaseConnectionlessQueryTest {
 			public boolean isDynamic() {
 				return false;
 			}
-        })), null);
+            @Override
+            public Integer getColumnQualifier() {
+                return null;
+            }
+        }, false)), null);
         aggregationManager.setAggregators(new ClientAggregators(Collections.<SingleAggregateFunction>singletonList(func), 1));
         ResultIterators iterators = new ResultIterators() {
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
index 452ea4d..0844d9b 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseConnectionlessQueryTest.java
@@ -125,15 +125,15 @@ public class BaseConnectionlessQueryTest extends BaseTest {
         try {
             PTable table = conn.getTable(new PTableKey(null, ATABLE_NAME));
             ATABLE = table;
-            ORGANIZATION_ID = new ColumnRef(new TableRef(table), table.getColumn("ORGANIZATION_ID").getPosition()).newColumnExpression();
-            ENTITY_ID = new ColumnRef(new TableRef(table), table.getColumn("ENTITY_ID").getPosition()).newColumnExpression();
-            A_INTEGER = new ColumnRef(new TableRef(table), table.getColumn("A_INTEGER").getPosition()).newColumnExpression();
-            A_STRING = new ColumnRef(new TableRef(table), table.getColumn("A_STRING").getPosition()).newColumnExpression();
-            B_STRING = new ColumnRef(new TableRef(table), table.getColumn("B_STRING").getPosition()).newColumnExpression();
-            A_DATE = new ColumnRef(new TableRef(table), table.getColumn("A_DATE").getPosition()).newColumnExpression();
-            A_TIME = new ColumnRef(new TableRef(table), table.getColumn("A_TIME").getPosition()).newColumnExpression();
-            A_TIMESTAMP = new ColumnRef(new TableRef(table), table.getColumn("A_TIMESTAMP").getPosition()).newColumnExpression();
-            X_DECIMAL = new ColumnRef(new TableRef(table), table.getColumn("X_DECIMAL").getPosition()).newColumnExpression();
+            ORGANIZATION_ID = new ColumnRef(new TableRef(table), table.getPColumnForColumnName("ORGANIZATION_ID").getPosition()).newColumnExpression();
+            ENTITY_ID = new ColumnRef(new TableRef(table), table.getPColumnForColumnName("ENTITY_ID").getPosition()).newColumnExpression();
+            A_INTEGER = new ColumnRef(new TableRef(table), table.getPColumnForColumnName("A_INTEGER").getPosition()).newColumnExpression();
+            A_STRING = new ColumnRef(new TableRef(table), table.getPColumnForColumnName("A_STRING").getPosition()).newColumnExpression();
+            B_STRING = new ColumnRef(new TableRef(table), table.getPColumnForColumnName("B_STRING").getPosition()).newColumnExpression();
+            A_DATE = new ColumnRef(new TableRef(table), table.getPColumnForColumnName("A_DATE").getPosition()).newColumnExpression();
+            A_TIME = new ColumnRef(new TableRef(table), table.getPColumnForColumnName("A_TIME").getPosition()).newColumnExpression();
+            A_TIMESTAMP = new ColumnRef(new TableRef(table), table.getPColumnForColumnName("A_TIMESTAMP").getPosition()).newColumnExpression();
+            X_DECIMAL = new ColumnRef(new TableRef(table), table.getPColumnForColumnName("X_DECIMAL").getPosition()).newColumnExpression();
         } finally {
             conn.close();
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionlessTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionlessTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionlessTest.java
index 3396cf8..a86cc8d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionlessTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ConnectionlessTest.java
@@ -142,15 +142,14 @@ public class ConnectionlessTest {
         assertTrue(iterator.hasNext());
         kv = iterator.next();
         assertArrayEquals(expectedRowKey1, kv.getRow());        
-        assertEquals(name1, PVarchar.INSTANCE.toObject(kv.getValue()));
-        assertTrue(iterator.hasNext());
+        assertEquals(QueryConstants.EMPTY_COLUMN_VALUE, PVarchar.INSTANCE.toObject(kv.getValue()));
         kv = iterator.next();
         assertArrayEquals(expectedRowKey1, kv.getRow());        
-        assertEquals(now, PDate.INSTANCE.toObject(kv.getValue()));
+        assertEquals(name1, PVarchar.INSTANCE.toObject(kv.getValue()));
         assertTrue(iterator.hasNext());
         kv = iterator.next();
         assertArrayEquals(expectedRowKey1, kv.getRow());        
-        assertEquals(QueryConstants.EMPTY_COLUMN_VALUE, PVarchar.INSTANCE.toObject(kv.getValue()));
+        assertEquals(now, PDate.INSTANCE.toObject(kv.getValue()));
     }
 
     @SuppressWarnings("deprecation")
@@ -159,15 +158,15 @@ public class ConnectionlessTest {
         assertTrue(iterator.hasNext());
         kv = iterator.next();
         assertArrayEquals(expectedRowKey2, kv.getRow());        
-        assertEquals(name2, PVarchar.INSTANCE.toObject(kv.getValue()));
-        assertTrue(iterator.hasNext());
+        assertEquals(QueryConstants.EMPTY_COLUMN_VALUE, PVarchar.INSTANCE.toObject(kv.getValue()));
         kv = iterator.next();
         assertArrayEquals(expectedRowKey2, kv.getRow());        
-        assertEquals(now, PDate.INSTANCE.toObject(kv.getValue()));
+        assertEquals(name2, PVarchar.INSTANCE.toObject(kv.getValue()));
         assertTrue(iterator.hasNext());
         kv = iterator.next();
         assertArrayEquals(expectedRowKey2, kv.getRow());        
-        assertEquals(QueryConstants.EMPTY_COLUMN_VALUE, PVarchar.INSTANCE.toObject(kv.getValue()));
+        assertEquals(now, PDate.INSTANCE.toObject(kv.getValue()));
+        assertFalse(iterator.hasNext());
     }
     
     @Test

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index c73c160..77417b5 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -257,11 +257,11 @@ public class TestUtil {
     }
 
     public static Expression constantComparison(CompareOp op, PColumn c, Object o) {
-        return  new ComparisonExpression(Arrays.<Expression>asList(new KeyValueColumnExpression(c), LiteralExpression.newConstant(o)), op);
+        return  new ComparisonExpression(Arrays.<Expression>asList(new KeyValueColumnExpression(c, true), LiteralExpression.newConstant(o)), op);
     }
 
     public static Expression kvColumn(PColumn c) {
-        return new KeyValueColumnExpression(c);
+        return new KeyValueColumnExpression(c, true);
     }
 
     public static Expression pkColumn(PColumn c, List<PColumn> columns) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-protocol/src/main/PTable.proto
----------------------------------------------------------------------
diff --git a/phoenix-protocol/src/main/PTable.proto b/phoenix-protocol/src/main/PTable.proto
index 09bdeb6..ae34695 100644
--- a/phoenix-protocol/src/main/PTable.proto
+++ b/phoenix-protocol/src/main/PTable.proto
@@ -49,6 +49,7 @@ message PColumn {
   optional string expression = 12;
   optional bool isRowTimestamp = 13;
   optional bool isDynamic = 14;
+  optional int32 columnQualifier = 15;
 }
 
 message PTableStats {
@@ -90,4 +91,5 @@ message PTable {
   optional bool transactional = 27;
   optional int64 updateCacheFrequency = 28;
   optional int64 indexDisableTimestamp = 29;
+  optional bytes storageScheme = 30;
 }


[2/3] phoenix git commit: Replace empty key value with unit tests passing

Posted by sa...@apache.org.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
index b8b0350..4767090 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.util.SchemaUtil;
 
 /**
  * When selecting specific columns in a SELECT query, this filter passes only selected columns
@@ -53,6 +53,8 @@ public class ColumnProjectionFilter extends FilterBase implements Writable {
     private byte[] emptyCFName;
     private Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> columnsTracker;
     private Set<byte[]> conditionOnlyCfs;
+    private boolean usesEncodedColumnNames;
+    private byte[] emptyKVQualifier;
 
     public ColumnProjectionFilter() {
 
@@ -60,10 +62,12 @@ public class ColumnProjectionFilter extends FilterBase implements Writable {
 
     public ColumnProjectionFilter(byte[] emptyCFName,
             Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> columnsTracker,
-            Set<byte[]> conditionOnlyCfs) {
+            Set<byte[]> conditionOnlyCfs, boolean usesEncodedColumnNames) {
         this.emptyCFName = emptyCFName;
         this.columnsTracker = columnsTracker;
         this.conditionOnlyCfs = conditionOnlyCfs;
+        this.usesEncodedColumnNames = usesEncodedColumnNames;
+        this.emptyKVQualifier = SchemaUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst();
     }
 
     @Override
@@ -87,6 +91,9 @@ public class ColumnProjectionFilter extends FilterBase implements Writable {
             familyMapSize--;
         }
         int conditionOnlyCfsSize = WritableUtils.readVInt(input);
+        usesEncodedColumnNames = conditionOnlyCfsSize > 0;
+        emptyKVQualifier = SchemaUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst();
+        conditionOnlyCfsSize = Math.abs(conditionOnlyCfsSize) - 1; // restore to the actual value.
         this.conditionOnlyCfs = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
         while (conditionOnlyCfsSize > 0) {
             this.conditionOnlyCfs.add(WritableUtils.readCompressedByteArray(input));
@@ -110,12 +117,13 @@ public class ColumnProjectionFilter extends FilterBase implements Writable {
                 }
             }
         }
-        // Write conditionOnlyCfs
-        WritableUtils.writeVInt(output, this.conditionOnlyCfs.size());
+        // Encode usesEncodedColumnNames in conditionOnlyCfs size.
+        WritableUtils.writeVInt(output, (this.conditionOnlyCfs.size() + 1) * (usesEncodedColumnNames ? 1 : -1));
         for (byte[] f : this.conditionOnlyCfs) {
             WritableUtils.writeCompressedByteArray(output, f);
         }
-    }
+    
+}
 
     @Override
     public byte[] toByteArray() throws IOException {
@@ -153,9 +161,9 @@ public class ColumnProjectionFilter extends FilterBase implements Writable {
         // make sure we're not holding to any of the byte[]'s
         ptr.set(HConstants.EMPTY_BYTE_ARRAY);
         if (kvs.isEmpty()) {
-            kvs.add(new KeyValue(firstKV.getRowArray(), firstKV.getRowOffset(),firstKV.getRowLength(), this.emptyCFName,
-                    0, this.emptyCFName.length, QueryConstants.EMPTY_COLUMN_BYTES, 0,
-                    QueryConstants.EMPTY_COLUMN_BYTES.length, HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0));
+            kvs.add(new KeyValue(firstKV.getRowArray(), firstKV.getRowOffset(), firstKV.getRowLength(),
+                    this.emptyCFName, 0, this.emptyCFName.length, emptyKVQualifier, 0,
+                    emptyKVQualifier.length, HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
index 569faa5..8d03797 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
@@ -184,7 +184,7 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil
         ExpressionVisitor<Void> visitor = new StatelessTraverseAllExpressionVisitor<Void>() {
             @Override
             public Void visit(KeyValueColumnExpression expression) {
-                inputTuple.addColumn(expression.getColumnFamily(), expression.getColumnName());
+                inputTuple.addColumn(expression.getColumnFamily(), expression.getColumnQualifier());
                 return null;
             }
         };

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
index eaf8d35..07f7072 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
@@ -58,7 +58,7 @@ public abstract class SingleKeyValueComparisonFilter extends BooleanExpressionFi
             @Override
             public Void visit(KeyValueColumnExpression expression) {
                 cf = expression.getColumnFamily();
-                cq = expression.getColumnName();
+                cq = expression.getColumnQualifier();
                 return null;
             }
         };

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 13ad7e5..7e6d0f6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -299,6 +299,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
     private int maxTrailingNulls;
     private ColumnReference dataEmptyKeyValueRef;
     private boolean rowKeyOrderOptimizable;
+    private boolean usesEncodedColumnNames;
+    private ImmutableBytesPtr emptyKeyValueQualifierPtr;
     
     private IndexMaintainer(RowKeySchema dataRowKeySchema, boolean isDataTableSalted) {
         this.dataRowKeySchema = dataRowKeySchema;
@@ -312,14 +314,19 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         this.isMultiTenant = dataTable.isMultiTenant();
         this.viewIndexId = index.getViewIndexId() == null ? null : MetaDataUtil.getViewIndexIdDataType().toBytes(index.getViewIndexId());
         this.isLocalIndex = index.getIndexType() == IndexType.LOCAL;
-
+        /* 
+         * There is nothing to prevent new indexes on existing tables to have encoded column names.
+         * Except, due to backward compatibility reasons, we aren't able to change IndexMaintainer and the state
+         * that is serialized in it. Because of this we are forced to have the indexes inherit the
+         * storage scheme of the parent data tables. 
+         */
+        this.usesEncodedColumnNames = SchemaUtil.usesEncodedColumnNames(index);
         byte[] indexTableName = index.getPhysicalName().getBytes();
         // Use this for the nDataSaltBuckets as we need this for local indexes
         // TODO: persist nDataSaltBuckets separately, but maintain b/w compat.
         Integer nIndexSaltBuckets = isLocalIndex ? dataTable.getBucketNum() : index.getBucketNum();
         boolean indexWALDisabled = index.isWALDisabled();
         int indexPosOffset = (index.getBucketNum() == null ? 0 : 1) + (this.isMultiTenant ? 1 : 0) + (this.viewIndexId == null ? 0 : 1);
-//        int indexPosOffset = !isLocalIndex && nIndexSaltBuckets > 0 ? 1 : 0;
         int nIndexColumns = index.getColumns().size() - indexPosOffset;
         int nIndexPKColumns = index.getPKColumns().size() - indexPosOffset;
         // number of expressions that are indexed that are not present in the row key of the data table
@@ -330,7 +337,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             String dataFamilyName = IndexUtil.getDataColumnFamilyName(indexColumnName);
             String dataColumnName = IndexUtil.getDataColumnName(indexColumnName);
             try {
-                PColumn dataColumn = dataFamilyName.equals("") ? dataTable.getColumn(dataColumnName) : dataTable.getColumnFamily(dataFamilyName).getColumn(dataColumnName);
+                PColumn dataColumn = dataFamilyName.equals("") ? dataTable.getPColumnForColumnName(dataColumnName) : dataTable.getColumnFamily(dataFamilyName).getPColumnForColumnName(dataColumnName);
                 if (SchemaUtil.isPKColumn(dataColumn)) 
                     continue;
             } catch (ColumnNotFoundException e) {
@@ -404,7 +411,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                 throw new RuntimeException(e); // Impossible
             }
             if ( expressionIndexCompiler.getColumnRef()!=null ) {
-            	// get the column of the data table that corresponds to this index column
+            	// get the column of the data column that corresponds to this index column
 	            PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString());
 	            boolean isPKColumn = SchemaUtil.isPKColumn(column);
 	            if (isPKColumn) {
@@ -429,7 +436,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             PColumnFamily family = index.getColumnFamilies().get(i);
             for (PColumn indexColumn : family.getColumns()) {
                 PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString());
-                this.coveredColumns.add(new ColumnReference(column.getFamilyName().getBytes(), column.getName().getBytes()));
+                byte[] cq = SchemaUtil.getColumnQualifier(column, index);
+                this.coveredColumns.add(new ColumnReference(column.getFamilyName().getBytes(), cq));
             }
         }
         this.estimatedIndexRowKeyBytes = estimateIndexRowKeyByteSize(indexColByteSize);
@@ -749,7 +757,6 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             // same for all rows in this index)
             if (!viewConstantColumnBitSet.get(i)) {
                 int pos = rowKeyMetaData.getIndexPkPosition(i-dataPosOffset);
-                Field dataField = dataRowKeySchema.getField(i);
                 indexFields[pos] = 
                         dataRowKeySchema.getField(i);
             } 
@@ -843,9 +850,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             put = new Put(indexRowKey);
             // add the keyvalue for the empty row
             put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey),
-                this.getEmptyKeyValueFamily(), QueryConstants.EMPTY_COLUMN_BYTES_PTR, ts,
+                this.getEmptyKeyValueFamily(), emptyKeyValueQualifierPtr, ts,
                 // set the value to the empty column name
-                QueryConstants.EMPTY_COLUMN_BYTES_PTR));
+                emptyKeyValueQualifierPtr));
             put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
         }
         int i = 0;
@@ -1058,6 +1065,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         
         if (isNewClient) {
             int numIndexedExpressions = WritableUtils.readVInt(input);
+            usesEncodedColumnNames = numIndexedExpressions > 0;
+            numIndexedExpressions = Math.abs(numIndexedExpressions) - 1;
             indexedExpressions = Lists.newArrayListWithExpectedSize(numIndexedExpressions);        
             for (int i = 0; i < numIndexedExpressions; i++) {
             	Expression expression = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
@@ -1147,7 +1156,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         WritableUtils.writeVInt(output,-emptyKeyValueCFPtr.getLength());
         output.write(emptyKeyValueCFPtr.get(),emptyKeyValueCFPtr.getOffset(), emptyKeyValueCFPtr.getLength());
         
-        WritableUtils.writeVInt(output, indexedExpressions.size());
+        // Hack to encode usesEncodedColumnNames in indexedExpressions size.
+        int indexedExpressionsSize = (indexedExpressions.size() + 1) * (usesEncodedColumnNames ? 1 : -1);
+        WritableUtils.writeVInt(output, indexedExpressionsSize);
         for (Expression expression : indexedExpressions) {
         	WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal());
         	expression.write(output);
@@ -1204,10 +1215,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
      * Init calculated state reading/creating
      */
     private void initCachedState() {
-        dataEmptyKeyValueRef =
-                new ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(),
-                        QueryConstants.EMPTY_COLUMN_BYTES);
-
+        dataEmptyKeyValueRef = new ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(), SchemaUtil
+                .getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst());
+        emptyKeyValueQualifierPtr = new ImmutableBytesPtr(SchemaUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst());
         indexQualifiers = Lists.newArrayListWithExpectedSize(this.coveredColumns.size());
         for (ColumnReference ref : coveredColumns) {
             indexQualifiers.add(new ImmutableBytesPtr(IndexUtil.getIndexColumnName(
@@ -1221,7 +1231,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         	KeyValueExpressionVisitor visitor = new KeyValueExpressionVisitor() {
                 @Override
                 public Void visit(KeyValueColumnExpression expression) {
-                	if (indexedColumns.add(new ColumnReference(expression.getColumnFamily(), expression.getColumnName()))) {
+                	if (indexedColumns.add(new ColumnReference(expression.getColumnFamily(), expression.getColumnQualifier()))) {
                 		indexedColumnTypes.add(expression.getDataType());
                 	}
                     return null;
@@ -1486,4 +1496,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             return udfParseNodes;
         }
     }
+    
+    public byte[] getEmptyKeyValueQualifier() {
+        return emptyKeyValueQualifierPtr.copyBytes();
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 32dc632..6ee981a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -221,10 +221,15 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
                 Scan scan = new Scan();
                 // Project all mutable columns
                 for (ColumnReference ref : mutableColumns) {
+                    //TODO: samarth confirm this is ok
                     scan.addColumn(ref.getFamily(), ref.getQualifier());
                 }
+                // Indexes inherit the storage scheme of the data table which means all the indexes have the same
+                // storage scheme and empty key value qualifier.
+                byte[] emptyKeyValueQualifier = indexMaintainers.get(0).getEmptyKeyValueQualifier();
+                
                 // Project empty key value column
-                scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES);
+                scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), emptyKeyValueQualifier);
                 ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1);
                 scanRanges.initializeScan(scan);
                 TableName tableName = env.getRegion().getRegionInfo().getTable();
@@ -262,7 +267,8 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
             Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException {
         if (scanner != null) {
             Result result;
-            ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES);
+            ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0)
+                    .getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier());
             // Process existing data table rows by removing the old index row and adding the new index row
             while ((result = scanner.next()) != null) {
                 Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow()));
@@ -290,7 +296,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
             // to generate point delete markers for all index rows that were added. We don't have Tephra
             // manage index rows in change sets because we don't want to be hit with the additional
             // memory hit and do not need to do conflict detection on index rows.
-            ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES);
+            ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier());
             while ((result = scanner.next()) != null) {
                 Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow()));
                 // Sort by timestamp, type, cf, cq so we can process in time batches from oldest to newest

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index c36a72b..eabaaa3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -204,7 +204,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                             // Project empty key value unless the column family containing it has
                             // been projected in its entirety.
                             if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) {
-                                scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES);
+                                scan.addColumn(ecf, SchemaUtil.getEmptyKeyValueInfo(table).getFirst());
                             }
                         }
                     }
@@ -297,6 +297,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                         if (whereCol.getSecond() == null) {
                             scan.addFamily(family);                            
                         } else {
+                            //TODO: samarth confirm this
                             scan.addColumn(family, whereCol.getSecond());
                         }
                     }
@@ -321,7 +322,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
             // the ExplicitColumnTracker not to be used, though.
             if (!statement.isAggregate() && filteredColumnNotInProjection) {
                 ScanUtil.andFilterAtEnd(scan, new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table),
-                        columnsTracker, conditionOnlyCfs));
+                        columnsTracker, conditionOnlyCfs, SchemaUtil.usesEncodedColumnNames(table)));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index f875a77..c11b472 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -202,6 +202,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
     public static final byte[] BASE_COLUMN_COUNT_BYTES = Bytes.toBytes(BASE_COLUMN_COUNT);
     public static final String IS_ROW_TIMESTAMP = "IS_ROW_TIMESTAMP";
     public static final byte[] IS_ROW_TIMESTAMP_BYTES = Bytes.toBytes(IS_ROW_TIMESTAMP);
+    public static final String COLUMN_QUALIFIER = "COLUMN_QUALIFIER";
+    public static final byte[] COLUMN_QUALIFIER_BYTES = Bytes.toBytes(COLUMN_QUALIFIER);
 
     public static final String TABLE_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY;
     public static final byte[] TABLE_FAMILY_BYTES = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
@@ -290,6 +292,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
     public static final byte[] UPDATE_CACHE_FREQUENCY_BYTES = Bytes.toBytes(UPDATE_CACHE_FREQUENCY);
 
     public static final String ASYNC_CREATED_DATE = "ASYNC_CREATED_DATE";
+    public static final String STORAGE_SCHEME = "STORAGE_SCHEME";
+    public static final byte[] STORAGE_SCHEME_BYTES = Bytes.toBytes(STORAGE_SCHEME);
 
     private final PhoenixConnection connection;
     private final ResultSet emptyResultSet;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index ff73530..404fb06 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -279,7 +279,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
         final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded);
         final String logicalNamesAsJson = TargetTableRefFunctions.LOGICAL_NAMES_TO_JSON.apply(tablesToBeLoaded);
 
-        job.getConfiguration().set(FormatToBytesWritableMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson);
+        job.getConfiguration().set(FormatToBytesWritableMapper.PHYSICAL_TABLE_NAMES_CONFKEY,tableNamesAsJson);
         job.getConfiguration().set(FormatToBytesWritableMapper.LOGICAL_NAMES_CONFKEY,logicalNamesAsJson);
 
         // give subclasses their hook

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
index 2c9b6d9..b975a03 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.TreeMap;
+
 import javax.annotation.Nullable;
 
 import org.apache.hadoop.conf.Configuration;
@@ -44,12 +45,12 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
 import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
-import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.UpsertExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,7 +87,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
     public static final String IGNORE_INVALID_ROW_CONFKEY = "phoenix.mapreduce.import.ignoreinvalidrow";
 
     /** Configuration key for the table names */
-    public static final String TABLE_NAMES_CONFKEY = "phoenix.mapreduce.import.tablenames";
+    public static final String PHYSICAL_TABLE_NAMES_CONFKEY = "phoenix.mapreduce.import.tablenames";
 
     /** Configuration key for the table logical names */
     public static final String LOGICAL_NAMES_CONFKEY = "phoenix.mapreduce.import.logicalnames";
@@ -101,8 +102,9 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
     protected PhoenixConnection conn;
     protected UpsertExecutor<RECORD, ?> upsertExecutor;
     protected ImportPreUpsertKeyValueProcessor preUpdateProcessor;
-    protected List<String> tableNames;
-    protected List<String> logicalNames;
+    //TODO: merge physicalTableNames and logicalNames into some associative data structure since they are 1:1.
+    protected List<String> physicalTableNames;
+    protected List<Pair<String, byte[]>> logicalTables; // List of pairs of a table's logical name and the empty key value's qualifier used for it.
     protected MapperUpsertListener<RECORD> upsertListener;
 
     /*
@@ -127,11 +129,14 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
         try {
             conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf);
 
-            final String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY);
+            final String tableNamesConf = conf.get(PHYSICAL_TABLE_NAMES_CONFKEY);
             final String logicalNamesConf = conf.get(LOGICAL_NAMES_CONFKEY);
-            tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
-            logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
-
+            physicalTableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
+            List<String> logicalTableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
+            for (String logicalTableName : logicalTableNames) {
+                PTable table = PhoenixRuntime.getTable(conn, logicalTableName);
+                logicalTables.add(new Pair<>(logicalTableName, SchemaUtil.getEmptyKeyValueInfo(table).getFirst()));
+            }
             columnIndexes = initColumnIndexes();
         } catch (SQLException | ClassNotFoundException e) {
             throw new RuntimeException(e);
@@ -143,7 +148,6 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
         preUpdateProcessor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
     }
 
-    @SuppressWarnings("deprecation")
     @Override
     protected void map(LongWritable key, Text value, Context context) throws IOException,
             InterruptedException {
@@ -173,8 +177,8 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
                 keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList);
                 byte[] first = kvPair.getFirst();
                 // Create a list of KV for each table
-                for (int i = 0; i < tableNames.size(); i++) {
-                    if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), first) == 0) {
+                for (int i = 0; i < physicalTableNames.size(); i++) {
+                    if (Bytes.compareTo(Bytes.toBytes(physicalTableNames.get(i)), first) == 0) {
                         if (!map.containsKey(i)) {
                             map.put(i, new ArrayList<KeyValue>());
                         }
@@ -201,8 +205,8 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
     private List<Map<byte[], Map<byte[], Integer>>> initColumnIndexes() throws SQLException {
         List<Map<byte[], Map<byte[], Integer>>> tableMap = new ArrayList<>();
         int tableIndex;
-        for (tableIndex = 0; tableIndex < tableNames.size(); tableIndex++) {
-            PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(tableIndex));
+        for (tableIndex = 0; tableIndex < physicalTableNames.size(); tableIndex++) {
+            PTable table = PhoenixRuntime.getTable(conn, logicalTables.get(tableIndex).getFirst());
             Map<byte[], Map<byte[], Integer>> columnMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
             List<PColumn> cls = table.getColumns();
             for (int i = 0; i < cls.size(); i++) {
@@ -247,7 +251,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
      * Collect all column values for the same rowKey
      *
      * @param context    Current mapper context
-     * @param tableIndex Table index in tableNames list
+     * @param tableIndex Table index in tableNames or logicalTables list
      * @param lkv        List of KV values that will be combined in a single ImmutableBytesWritable
      * @throws IOException
      * @throws InterruptedException
@@ -258,12 +262,14 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
         ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
         DataOutputStream outputStream = new DataOutputStream(bos);
         ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
+        String tableName = physicalTableNames.get(tableIndex);
+        Pair<String, byte[]> logicalTable = logicalTables.get(tableIndex);
         if (!lkv.isEmpty()) {
             // All Key Values for the same row are supposed to be the same, so init rowKey only once
             Cell first = lkv.get(0);
             outputKey.set(first.getRowArray(), first.getRowOffset(), first.getRowLength());
             for (KeyValue cell : lkv) {
-                if (isEmptyCell(cell)) {
+                if (isEmptyCell(cell, logicalTable.getSecond())) {
                     continue;
                 }
                 int i = findIndex(tableIndex, cell);
@@ -278,13 +284,13 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
         }
         ImmutableBytesWritable aggregatedArray = new ImmutableBytesWritable(bos.toByteArray());
         outputStream.close();
-        context.write(new TableRowkeyPair(tableNames.get(tableIndex), outputKey), aggregatedArray);
+        context.write(new TableRowkeyPair(tableName, outputKey), aggregatedArray);
     }
 
-    protected boolean isEmptyCell(KeyValue cell) {
+    protected boolean isEmptyCell(KeyValue cell, byte[] emptyKVQualifier) {
         if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(),
-                cell.getQualifierLength(), QueryConstants.EMPTY_COLUMN_BYTES, 0,
-                QueryConstants.EMPTY_COLUMN_BYTES.length) != 0)
+                cell.getQualifierLength(), emptyKVQualifier, 0,
+                emptyKVQualifier.length) != 0)
             return false;
         else
             return true;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
index e906431..04fe959 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
@@ -80,7 +80,7 @@ public class FormatToKeyValueReducer
         try {
             PhoenixConnection conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf);
             builder = conn.getKeyValueBuilder();
-            final String tableNamesConf = conf.get(FormatToBytesWritableMapper.TABLE_NAMES_CONFKEY);
+            final String tableNamesConf = conf.get(FormatToBytesWritableMapper.PHYSICAL_TABLE_NAMES_CONFKEY);
             final String logicalNamesConf = conf.get(FormatToBytesWritableMapper.LOGICAL_NAMES_CONFKEY);
             tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
             logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
@@ -98,16 +98,16 @@ public class FormatToKeyValueReducer
             PTable table = PhoenixRuntime.getTable(conn, tableName);
             emptyFamilyName.add(SchemaUtil.getEmptyColumnFamilyPtr(table));
             List<PColumn> cls = table.getColumns();
-            List<Pair<byte[], byte[]>> list = new ArrayList(cls.size());
+            List<Pair<byte[], byte[]>> list = new ArrayList<>(cls.size());
             for (int i = 0; i < cls.size(); i++) {
                 PColumn c = cls.get(i);
-                if (c.getFamilyName() == null) {
+                if (SchemaUtil.isPKColumn(c)) {
                     list.add(null); // Skip PK column
                     continue;
                 }
                 byte[] family = c.getFamilyName().getBytes();
-                byte[] name = c.getName().getBytes();
-                list.add(new Pair(family, name));
+                byte[] name = SchemaUtil.getColumnQualifier(c, table);
+                list.add(new Pair<>(family, name));
             }
             columnIndexes.add(list);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index a8595e9..a9f1e87 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -569,6 +569,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         }
     }
     
+    @Override
     public PMetaData updateResolvedTimestamp(PTable table, long resolvedTime) throws SQLException {
     	synchronized (latestMetaDataLock) {
             throwConnectionClosedIfNullMetaData();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 471ac73..adbbdfe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -29,6 +29,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CURRENT_VALUE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CYCLE_FLAG;
@@ -82,6 +83,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SOURCE_DATA_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SQL_DATA_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SQL_DATETIME_SUB;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_WITH;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORAGE_SCHEME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
@@ -115,6 +117,7 @@ import org.apache.phoenix.schema.MetaDataSplitPolicy;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.ByteUtil;
 
 
@@ -176,11 +179,18 @@ public interface QueryConstants {
     public static final byte[] EMPTY_COLUMN_BYTES = Bytes.toBytes(EMPTY_COLUMN_NAME);
     public static final ImmutableBytesPtr EMPTY_COLUMN_BYTES_PTR = new ImmutableBytesPtr(
             EMPTY_COLUMN_BYTES);
+    public static final Integer ENCODED_EMPTY_COLUMN_NAME = 0;
+    public static final byte[] ENCODED_EMPTY_COLUMN_BYTES = PInteger.INSTANCE.toBytes(ENCODED_EMPTY_COLUMN_NAME);
+    public static final ImmutableBytesPtr ENCODED_EMPTY_COLUMN_BYTES_PTR = new ImmutableBytesPtr(
+            ENCODED_EMPTY_COLUMN_BYTES);
     public final static String EMPTY_COLUMN_VALUE = "x";
     public final static byte[] EMPTY_COLUMN_VALUE_BYTES = Bytes.toBytes(EMPTY_COLUMN_VALUE);
     public static final ImmutableBytesPtr EMPTY_COLUMN_VALUE_BYTES_PTR = new ImmutableBytesPtr(
             EMPTY_COLUMN_VALUE_BYTES);
-
+    public static final String ENCODED_EMPTY_COLUMN_VALUE = EMPTY_COLUMN_VALUE;
+    public final static byte[] ENCODED_EMPTY_COLUMN_VALUE_BYTES = Bytes.toBytes(EMPTY_COLUMN_VALUE);
+    public static final ImmutableBytesPtr ENCODED_EMPTY_COLUMN_VALUE_BYTES_PTR = new ImmutableBytesPtr(
+            ENCODED_EMPTY_COLUMN_VALUE_BYTES);
     public static final String DEFAULT_COLUMN_FAMILY = "0";
     public static final byte[] DEFAULT_COLUMN_FAMILY_BYTES = Bytes.toBytes(DEFAULT_COLUMN_FAMILY);
     public static final ImmutableBytesPtr DEFAULT_COLUMN_FAMILY_BYTES_PTR = new ImmutableBytesPtr(
@@ -259,6 +269,8 @@ public interface QueryConstants {
             IS_ROW_TIMESTAMP + " BOOLEAN, " +
             TRANSACTIONAL + " BOOLEAN," +
             UPDATE_CACHE_FREQUENCY + " BIGINT," +
+            COLUMN_QUALIFIER + " INTEGER," +
+            STORAGE_SCHEME + " TINYINT, " + 
             "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + ","
             + TABLE_SCHEM + "," + TABLE_NAME + "," + COLUMN_NAME + "," + COLUMN_FAMILY + "))\n" +
             HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
index 76f6218..d7c6456 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
@@ -45,7 +45,7 @@ public class ColumnRef {
     }
 
     public ColumnRef(TableRef tableRef, String familyName, String columnName) throws MetaDataEntityNotFoundException {
-        this(tableRef, tableRef.getTable().getColumnFamily(familyName).getColumn(columnName).getPosition());
+        this(tableRef, tableRef.getTable().getColumnFamily(familyName).getPColumnForColumnName(columnName).getPosition());
     }
 
     public ColumnRef(TableRef tableRef, int columnPosition) {
@@ -109,7 +109,7 @@ public class ColumnRef {
         	return new ProjectedColumnExpression(column, table, displayName);
         }
        
-        return new KeyValueColumnExpression(column, displayName);
+        return new KeyValueColumnExpression(column, displayName, SchemaUtil.usesEncodedColumnNames(table));
     }
 
     public ColumnRef cloneAtTimestamp(long timestamp) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
index a60229e..65e362c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
@@ -90,4 +90,9 @@ public class DelegateColumn extends DelegateDatum implements PColumn {
 	public boolean isDynamic() {
 		return getDelegate().isDynamic();
 	}
+
+    @Override
+    public Integer getColumnQualifier() {
+        return getDelegate().getColumnQualifier();
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index b294f03..de72c48 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -97,8 +97,8 @@ public class DelegateTable implements PTable {
     }
 
     @Override
-    public PColumn getColumn(String name) throws ColumnNotFoundException, AmbiguousColumnException {
-        return delegate.getColumn(name);
+    public PColumn getPColumnForColumnName(String name) throws ColumnNotFoundException, AmbiguousColumnException {
+        return delegate.getPColumnForColumnName(name);
     }
 
     @Override
@@ -271,4 +271,14 @@ public class DelegateTable implements PTable {
     public long getUpdateCacheFrequency() {
         return delegate.getUpdateCacheFrequency();
     }
+
+    @Override
+    public StorageScheme getStorageScheme() {
+        return delegate.getStorageScheme();
+    }
+
+    @Override
+    public PColumn getPColumnForColumnQualifier(byte[] cq) throws ColumnNotFoundException, AmbiguousColumnException {
+        return delegate.getPColumnForColumnQualifier(cq);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 7f3f850..455e2b1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -33,6 +33,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE;
@@ -65,6 +66,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORAGE_SCHEME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
@@ -180,6 +182,7 @@ import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTable.LinkType;
+import org.apache.phoenix.schema.PTable.StorageScheme;
 import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.stats.PTableStats;
 import org.apache.phoenix.schema.types.PDataType;
@@ -248,8 +251,9 @@ public class MetaDataClient {
             STORE_NULLS + "," +
             BASE_COLUMN_COUNT + "," +
             TRANSACTIONAL + "," +
-            UPDATE_CACHE_FREQUENCY +
-            ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+            UPDATE_CACHE_FREQUENCY + ", " +
+            STORAGE_SCHEME +
+            ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
     private static final String CREATE_LINK =
             "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
             TENANT_ID + "," +
@@ -321,8 +325,9 @@ public class MetaDataClient {
         PK_NAME + "," +  // write this both in the column and table rows for access by metadata APIs
         KEY_SEQ + "," +
         COLUMN_DEF + "," +
-        IS_ROW_TIMESTAMP + 
-        ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+        COLUMN_QUALIFIER + "," +
+        IS_ROW_TIMESTAMP +
+        ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
     private static final String INSERT_COLUMN_ALTER_TABLE =
             "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
             TENANT_ID + "," +
@@ -342,8 +347,9 @@ public class MetaDataClient {
             IS_VIEW_REFERENCED + "," +
             PK_NAME + "," +  // write this both in the column and table rows for access by metadata APIs
             KEY_SEQ + "," +
-            COLUMN_DEF +
-            ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+            COLUMN_DEF + ", " +
+            COLUMN_QUALIFIER +
+            ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
     private static final String UPDATE_COLUMN_POSITION =
         "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\" ( " +
         TENANT_ID + "," +
@@ -670,10 +676,10 @@ public class MetaDataClient {
                     byte[] cf= colRef.getFamily();
                     byte[] cq= colRef.getQualifier();
                     if (cf!=null) {
-                        view.getColumnFamily(cf).getColumn(cq);
+                        view.getColumnFamily(cf).getPColumnForColumnQualifier(cq);
                     }
                     else {
-                        view.getColumn( Bytes.toString(cq));
+                        view.getPColumnForColumnQualifier(cq);
                     }
                 } catch (ColumnNotFoundException e) { // Ignore this index and continue with others
                     containsAllReqdCols = false;
@@ -689,7 +695,7 @@ public class MetaDataClient {
                         // but the WHERE clause for the view statement (which is added to the index below)
                         // would fail to compile.
                         String indexColumnName = IndexUtil.getIndexColumnName(col);
-                        index.getColumn(indexColumnName);
+                        index.getPColumnForColumnName(indexColumnName);
                     } catch (ColumnNotFoundException e) { // Ignore this index and continue with others
                         containsAllReqdCols = false;
                         break;
@@ -747,8 +753,13 @@ public class MetaDataClient {
         } else {
             colUpsert.setString(18, column.getExpressionStr());
         }
-        if (colUpsert.getParameterMetaData().getParameterCount() > 18) {
-            colUpsert.setBoolean(19, column.isRowTimestamp());
+        if (column.getColumnQualifier() == null) {
+            colUpsert.setNull(19, Types.INTEGER);
+        } else {
+            colUpsert.setInt(19, column.getColumnQualifier());
+        }
+        if (colUpsert.getParameterMetaData().getParameterCount() > 19) {
+            colUpsert.setBoolean(20, column.isRowTimestamp());
         }
         colUpsert.execute();
     }
@@ -767,7 +778,7 @@ public class MetaDataClient {
         argUpsert.execute();
     }
 
-    private PColumn newColumn(int position, ColumnDef def, PrimaryKeyConstraint pkConstraint, String defaultColumnFamily, boolean addingToPK) throws SQLException {
+    private PColumn newColumn(int position, ColumnDef def, PrimaryKeyConstraint pkConstraint, String defaultColumnFamily, boolean addingToPK, Map<String, Integer> nextColumnQualifiers) throws SQLException {
         try {
             ColumnName columnDefName = def.getColumnDefName();
             SortOrder sortOrder = def.getSortOrder();
@@ -815,15 +826,23 @@ public class MetaDataClient {
                 }
                 isNull = false;
             }
-
+            Integer columnQualifier = null;
+            if (!isPK && nextColumnQualifiers != null) {
+                columnQualifier = nextColumnQualifiers.get(familyName.getString());
+                if (columnQualifier == null) {
+                    // We use columnQualifier 0 for the special empty key value.
+                    columnQualifier = 1;
+                }
+                nextColumnQualifiers.put(familyName.toString(), columnQualifier + 1);
+            }
             PColumn column = new PColumnImpl(PNameFactory.newName(columnName), familyName, def.getDataType(),
-                    def.getMaxLength(), def.getScale(), isNull, position, sortOrder, def.getArraySize(), null, false, def.getExpression(), isRowTimestamp, false);
+                    def.getMaxLength(), def.getScale(), isNull, position, sortOrder, def.getArraySize(), null, false, def.getExpression(), isRowTimestamp, false, columnQualifier);
             return column;
         } catch (IllegalArgumentException e) { // Based on precondition check in constructor
             throw new SQLException(e);
         }
     }
-
+    
     public MutationState createTable(CreateTableStatement statement, byte[][] splits, PTable parent, String viewStatement, ViewType viewType, byte[][] viewColumnConstants, BitSet isViewColumnReferenced) throws SQLException {
         PTable table = createTableInternal(statement, splits, parent, viewStatement, viewType, viewColumnConstants, isViewColumnReferenced, null, null, null);
         if (table == null || table.getType() == PTableType.VIEW || table.isTransactional()) {
@@ -1683,7 +1702,7 @@ public class MetaDataClient {
                 .build().buildException();
             }
             // can't create a transactional table if it has a row timestamp column
-            if (pkConstraint.getNumColumnsWithRowTimestamp()>0 && transactional) {
+            if (pkConstraint.getNumColumnsWithRowTimestamp() > 0 && transactional) {
                 throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CREATE_TXN_TABLE_WITH_ROW_TIMESTAMP)
                 .setSchemaName(schemaName).setTableName(tableName)
                 .build().buildException();
@@ -1843,6 +1862,39 @@ public class MetaDataClient {
             int pkPositionOffset = pkColumns.size();
             int position = positionOffset;
             
+            StorageScheme storageScheme = null;
+            Map<String, Integer> nextColumnQualifiers = null; // this would be null for tables created before phoenix 4.8.
+            if (SchemaUtil.isSystemTable(Bytes.toBytes(SchemaUtil.getTableName(schemaName, tableName)))) {
+                // System tables have hard-coded column qualifiers. So we can't use column encoding for them.
+                storageScheme = StorageScheme.NON_ENCODED_COLUMN_NAMES;
+            } else if (parent != null && tableType == PTableType.VIEW) {
+                // We can't control what column qualifiers are used in HTable mapped to Phoenix views. So
+                // we are not able to encode column names.
+                if (viewType != null && viewType == MAPPED) {
+                    storageScheme = StorageScheme.NON_ENCODED_COLUMN_NAMES;
+                } else {
+                    // for regular phoenix views, use the storage scheme of the parent since they all share the parent's
+                    // HTable.
+                    storageScheme = parent.getStorageScheme();
+                    if (storageScheme == StorageScheme.ENCODED_COLUMN_NAMES) {
+                        nextColumnQualifiers  = SchemaUtil.getNextColumnQualifiers(parent);
+                    }
+                }
+            } else if (parent != null && tableType == PTableType.INDEX) {
+                // New indexes on existing tables can have encoded column names. But unfortunately, due to 
+                // backward compatibility reasons, we aren't able to change IndexMaintainer and the state
+                // that is serialized in it. Because of this we are forced to have the indexes inherit the
+                // storage scheme of the parent data tables.
+                storageScheme = parent.getStorageScheme();
+                if (storageScheme == StorageScheme.ENCODED_COLUMN_NAMES) {
+                    nextColumnQualifiers  = SchemaUtil.getNextColumnQualifiers(parent);
+                }
+            } else {
+                // we always attempt to create tables with encoded column names.
+                storageScheme = StorageScheme.ENCODED_COLUMN_NAMES;
+                nextColumnQualifiers = Maps.newHashMapWithExpectedSize(colDefs.size() - pkColumns.size());
+            }
+            
             for (ColumnDef colDef : colDefs) {
                 rowTimeStampColumnAlreadyFound = checkAndValidateRowTimestampCol(colDef, pkConstraint, rowTimeStampColumnAlreadyFound, tableType);
                 if (colDef.isPK()) { // i.e. the column is declared as CREATE TABLE COLNAME DATATYPE PRIMARY KEY...
@@ -1861,7 +1913,7 @@ public class MetaDataClient {
                                 .setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException();
                     }
                 }
-                PColumn column = newColumn(position++, colDef, pkConstraint, defaultFamilyName, false);
+                PColumn column = newColumn(position++, colDef, pkConstraint, defaultFamilyName, false, nextColumnQualifiers);
                 if (SchemaUtil.isPKColumn(column)) {
                     // TODO: remove this constraint?
                     if (pkColumnsIterator.hasNext() && !column.getName().getString().equals(pkColumnsIterator.next().getFirst().getColumnName())) {
@@ -1972,13 +2024,14 @@ public class MetaDataClient {
             if (SchemaUtil.isMetaTable(schemaName,tableName)) {
                 // TODO: what about stats for system catalog?
                 PName newSchemaName = PNameFactory.newName(schemaName);
+                // Column names and qualifiers and hardcoded for system tables.
                 PTable table = PTableImpl.makePTable(tenantId,newSchemaName, PNameFactory.newName(tableName), tableType,
                         null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM,
                         PNameFactory.newName(QueryConstants.SYSTEM_TABLE_PK_NAME), null, columns, null, null,
                         Collections.<PTable>emptyList(), isImmutableRows,
                         Collections.<PName>emptyList(), defaultFamilyName == null ? null :
                                 PNameFactory.newName(defaultFamilyName), null,
-                        Boolean.TRUE.equals(disableWAL), false, false, null, indexId, indexType, true, false, 0, 0L);
+                        Boolean.TRUE.equals(disableWAL), false, false, null, indexId, indexType, true, false, 0, 0L, StorageScheme.NON_ENCODED_COLUMN_NAMES);
                 connection.addTable(table, MetaDataProtocol.MIN_TABLE_TIMESTAMP);
             } else if (tableType == PTableType.INDEX && indexId == null) {
                 if (tableProps.get(HTableDescriptor.MAX_FILESIZE) == null) {
@@ -2038,7 +2091,6 @@ public class MetaDataClient {
                 tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
                 connection.rollback();
             }
-
             String dataTableName = parent == null || tableType == PTableType.VIEW ? null : parent.getTableName().getString();
             PIndexState indexState = parent == null || tableType == PTableType.VIEW  ? null : PIndexState.BUILDING;
             PreparedStatement tableUpsert = connection.prepareStatement(CREATE_TABLE);
@@ -2084,6 +2136,7 @@ public class MetaDataClient {
             }
             tableUpsert.setBoolean(21, transactional);
             tableUpsert.setLong(22, updateCacheFrequency);
+            tableUpsert.setByte(23, storageScheme.getSerializedValue()); //TODO: samarth should there be a null check here?
             tableUpsert.execute();
 
             if (asyncCreatedDate != null) {
@@ -2148,7 +2201,7 @@ public class MetaDataClient {
                         PTable.INITIAL_SEQ_NUM, pkName == null ? null : PNameFactory.newName(pkName), saltBucketNum, columns,
                         dataTableName == null ? null : newSchemaName, dataTableName == null ? null : PNameFactory.newName(dataTableName), Collections.<PTable>emptyList(), isImmutableRows,
                         physicalNames, defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), viewStatement, Boolean.TRUE.equals(disableWAL), multiTenant, storeNulls, viewType,
-                        indexId, indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency, 0L);
+                        indexId, indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency, 0L, storageScheme);
                 result = new MetaDataMutationResult(code, result.getMutationTime(), table, true);
                 addTableToCache(result);
                 return table;
@@ -2701,6 +2754,7 @@ public class MetaDataClient {
                 List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnDefs.size());
                 Set<String> colFamiliesForPColumnsToBeAdded = new LinkedHashSet<>();
                 Set<String> families = new LinkedHashSet<>();
+                Map<String, Integer> nextColumnQualifiers = SchemaUtil.getNextColumnQualifiers(table);
                 if (columnDefs.size() > 0 ) {
                     try (PreparedStatement colUpsert = connection.prepareStatement(INSERT_COLUMN_ALTER_TABLE)) {
                         short nextKeySeq = SchemaUtil.getMaxKeySeq(table);
@@ -2721,7 +2775,7 @@ public class MetaDataClient {
                                 throw new SQLExceptionInfo.Builder(SQLExceptionCode.ROWTIMESTAMP_CREATE_ONLY)
                                 .setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException();
                             }
-                            PColumn column = newColumn(position++, colDef, PrimaryKeyConstraint.EMPTY, table.getDefaultFamilyName() == null ? null : table.getDefaultFamilyName().getString(), true);
+                            PColumn column = newColumn(position++, colDef, PrimaryKeyConstraint.EMPTY, table.getDefaultFamilyName() == null ? null : table.getDefaultFamilyName().getString(), true, nextColumnQualifiers);
                             columns.add(column);
                             String pkName = null;
                             Short keySeq = null;
@@ -2759,7 +2813,7 @@ public class MetaDataClient {
                                         ColumnName indexColName = ColumnName.caseSensitiveColumnName(IndexUtil.getIndexColumnName(null, colDef.getColumnDefName().getColumnName()));
                                         Expression expression = new RowKeyColumnExpression(columns.get(i), new RowKeyValueAccessor(pkColumns, ++pkSlotPosition));
                                         ColumnDef indexColDef = FACTORY.columnDef(indexColName, indexColDataType.getSqlTypeName(), colDef.isNull(), colDef.getMaxLength(), colDef.getScale(), true, colDef.getSortOrder(), expression.toString(), colDef.isRowTimestamp());
-                                        PColumn indexColumn = newColumn(indexPosition++, indexColDef, PrimaryKeyConstraint.EMPTY, null, true);
+                                        PColumn indexColumn = newColumn(indexPosition++, indexColDef, PrimaryKeyConstraint.EMPTY, null, true, nextColumnQualifiers);
                                         addColumnMutation(schemaName, index.getTableName().getString(), indexColumn, colUpsert, index.getParentTableName().getString(), index.getPKName() == null ? null : index.getPKName().getString(), ++nextIndexKeySeq, index.getBucketNum() != null);
                                     }
                                 }
@@ -3034,7 +3088,7 @@ public class MetaDataClient {
                         } 
                         else if (coveredColumns.contains(columnToDropRef)) {
                             String indexColumnName = IndexUtil.getIndexColumnName(columnToDrop);
-                            indexColumnsToDrop.add(index.getColumn(indexColumnName));
+                            indexColumnsToDrop.add(index.getPColumnForColumnName(indexColumnName));
                         }
                     }
                     if(!indexColumnsToDrop.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
index 0f5fa44..b63c97b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
@@ -27,7 +27,7 @@ package org.apache.phoenix.schema;
 public interface PColumn extends PDatum {
 
     /**
-     * @return the name of the column qualifier
+     * @return the name of the column
      */
     PName getName();
 
@@ -60,4 +60,9 @@ public interface PColumn extends PDatum {
     boolean isRowTimestamp();
     
     boolean isDynamic();
+    
+    /**
+     * @return name of the HBase column qualifier
+     */
+    Integer getColumnQualifier();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java
index 24da14d..c4c383e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java
@@ -39,16 +39,22 @@ public interface PColumnFamily {
     Collection<PColumn> getColumns();
     
     /**
-     * @return The PColumn for the specified column qualifier.
+     * @return The PColumn for the specified column name.
      * @throws ColumnNotFoundException if the column cannot be found
      */
-    PColumn getColumn(byte[] qualifier) throws ColumnNotFoundException;
+    PColumn getPColumnForColumnNameBytes(byte[] columnNameBytes) throws ColumnNotFoundException;
     
     /**
-     * @return The PColumn for the specified column qualifier.
+     * @return The PColumn for the specified column name.
      * @throws ColumnNotFoundException if the column cannot be found
      */
-    PColumn getColumn(String name) throws ColumnNotFoundException;
+    PColumn getPColumnForColumnName(String columnName) throws ColumnNotFoundException;
     
     int getEstimatedSize();
+    
+    /**
+     * @return The PColumn for the specified column qualifier.
+     * @throws ColumnNotFoundException if the column cannot be found
+     */
+    PColumn getPColumnForColumnQualifier(byte[] cq) throws ColumnNotFoundException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
index 2e29656..a3d855b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.SizedUtil;
 
 import com.google.common.base.Preconditions;
@@ -31,8 +32,9 @@ import com.google.common.collect.ImmutableSortedMap;
 public class PColumnFamilyImpl implements PColumnFamily {
     private final PName name;
     private final List<PColumn> columns;
-    private final Map<String, PColumn> columnByString;
-    private final Map<byte[], PColumn> columnByBytes;
+    private final Map<String, PColumn> columnNamesByStrings;
+    private final Map<byte[], PColumn> columnNamesByBytes;
+    private final Map<byte[], PColumn> columnQualifiersByBytes;
     private final int estimatedSize;
 
     @Override
@@ -40,22 +42,28 @@ public class PColumnFamilyImpl implements PColumnFamily {
         return estimatedSize;
     }
     
-    public PColumnFamilyImpl(PName name, List<PColumn> columns) {
+    public PColumnFamilyImpl(PName name, List<PColumn> columns, boolean useEncodedColumnNames) {
         Preconditions.checkNotNull(name);
         // Include guidePosts also in estimating the size
         long estimatedSize = SizedUtil.OBJECT_SIZE + SizedUtil.POINTER_SIZE * 5 + SizedUtil.INT_SIZE + name.getEstimatedSize() +
                 SizedUtil.sizeOfMap(columns.size()) * 2 + SizedUtil.sizeOfArrayList(columns.size());
         this.name = name;
         this.columns = ImmutableList.copyOf(columns);
-        ImmutableMap.Builder<String, PColumn> columnByStringBuilder = ImmutableMap.builder();
-        ImmutableSortedMap.Builder<byte[], PColumn> columnByBytesBuilder = ImmutableSortedMap.orderedBy(Bytes.BYTES_COMPARATOR);
+        ImmutableMap.Builder<String, PColumn> columnNamesByStringBuilder = ImmutableMap.builder();
+        ImmutableSortedMap.Builder<byte[], PColumn> columnNamesByBytesBuilder = ImmutableSortedMap.orderedBy(Bytes.BYTES_COMPARATOR);
+        ImmutableSortedMap.Builder<byte[], PColumn> columnQualifiersByBytesBuilder = ImmutableSortedMap.orderedBy(Bytes.BYTES_COMPARATOR);
         for (PColumn column : columns) {
             estimatedSize += column.getEstimatedSize();
-            columnByBytesBuilder.put(column.getName().getBytes(), column);
-            columnByStringBuilder.put(column.getName().getString(), column);
+            columnNamesByBytesBuilder.put(column.getName().getBytes(), column);
+            columnNamesByStringBuilder.put(column.getName().getString(), column);
+            // TODO: samarth fix this. projected columns have column family for pk columns which messes up with checks.
+            if (!useEncodedColumnNames || (useEncodedColumnNames && !(column.getColumnQualifier() == null))) {
+                columnQualifiersByBytesBuilder.put(SchemaUtil.getColumnQualifier(column, useEncodedColumnNames), column);
+            }
         }
-        this.columnByBytes = columnByBytesBuilder.build();
-        this.columnByString = columnByStringBuilder.build();
+        this.columnNamesByBytes = columnNamesByBytesBuilder.build();
+        this.columnNamesByStrings = columnNamesByStringBuilder.build();
+        this.columnQualifiersByBytes = useEncodedColumnNames ? columnQualifiersByBytesBuilder.build() : columnNamesByBytes;
         this.estimatedSize = (int)estimatedSize;
     }
     
@@ -70,19 +78,28 @@ public class PColumnFamilyImpl implements PColumnFamily {
     }
 
     @Override
-    public PColumn getColumn(byte[] qualifier) throws ColumnNotFoundException  {
-        PColumn column = columnByBytes.get(qualifier);
+    public PColumn getPColumnForColumnNameBytes(byte[] columnNameBytes) throws ColumnNotFoundException  {
+        PColumn column = columnNamesByBytes.get(columnNameBytes);
         if (column == null) {
-            throw new ColumnNotFoundException(Bytes.toString(qualifier));
+            throw new ColumnNotFoundException(Bytes.toString(columnNameBytes));
         }
         return column;
     }
     
     @Override
-    public PColumn getColumn(String name) throws ColumnNotFoundException  {
-        PColumn column = columnByString.get(name);
+    public PColumn getPColumnForColumnName(String columnName) throws ColumnNotFoundException  {
+        PColumn column = columnNamesByStrings.get(columnName);
         if (column == null) {
-            throw new ColumnNotFoundException(name);
+            throw new ColumnNotFoundException(columnName);
+        }
+        return column;
+    }
+    
+    @Override
+    public PColumn getPColumnForColumnQualifier(byte[] cq) throws ColumnNotFoundException {
+        PColumn column = columnQualifiersByBytes.get(cq);
+        if (column == null) {
+            throw new ColumnNotFoundException(Bytes.toString(cq));
         }
         return column;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
index a556f76..5cf1465 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
@@ -40,6 +40,7 @@ public class PColumnImpl implements PColumn {
     private String expressionStr;
     private boolean isRowTimestamp;
     private boolean isDynamic;
+    private Integer columnQualifier;
     
     public PColumnImpl() {
     }
@@ -51,13 +52,13 @@ public class PColumnImpl implements PColumn {
                        Integer scale,
                        boolean nullable,
                        int position,
-                       SortOrder sortOrder, Integer arrSize, byte[] viewConstant, boolean isViewReferenced, String expressionStr, boolean isRowTimestamp, boolean isDynamic) {
-        init(name, familyName, dataType, maxLength, scale, nullable, position, sortOrder, arrSize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, isDynamic);
+                       SortOrder sortOrder, Integer arrSize, byte[] viewConstant, boolean isViewReferenced, String expressionStr, boolean isRowTimestamp, boolean isDynamic, Integer columnQualifier) {
+        init(name, familyName, dataType, maxLength, scale, nullable, position, sortOrder, arrSize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, isDynamic, columnQualifier);
     }
 
     public PColumnImpl(PColumn column, int position) {
         this(column.getName(), column.getFamilyName(), column.getDataType(), column.getMaxLength(),
-                column.getScale(), column.isNullable(), position, column.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp(), column.isDynamic());
+                column.getScale(), column.isNullable(), position, column.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp(), column.isDynamic(), column.getColumnQualifier());
     }
 
     private void init(PName name,
@@ -69,7 +70,7 @@ public class PColumnImpl implements PColumn {
             int position,
             SortOrder sortOrder,
             Integer arrSize,
-            byte[] viewConstant, boolean isViewReferenced, String expressionStr, boolean isRowTimestamp, boolean isDynamic) {
+            byte[] viewConstant, boolean isViewReferenced, String expressionStr, boolean isRowTimestamp, boolean isDynamic, Integer columnQualifier) {
     	Preconditions.checkNotNull(sortOrder);
         this.dataType = dataType;
         if (familyName == null) {
@@ -94,6 +95,7 @@ public class PColumnImpl implements PColumn {
         this.expressionStr = expressionStr;
         this.isRowTimestamp = isRowTimestamp;
         this.isDynamic = isDynamic;
+        this.columnQualifier = columnQualifier;
     }
 
     @Override
@@ -205,6 +207,11 @@ public class PColumnImpl implements PColumn {
     public boolean isDynamic() {
         return isDynamic;
     }
+    
+    @Override
+    public Integer getColumnQualifier() {
+        return columnQualifier;
+    }
 
     /**
      * Create a PColumn instance from PBed PColumn instance
@@ -251,8 +258,12 @@ public class PColumnImpl implements PColumn {
         if (column.hasIsDynamic()) {
         	isDynamic = column.getIsDynamic();
         }
+        Integer columnQualifier = null;
+        if (column.hasColumnQualifier()) {
+            columnQualifier = column.getColumnQualifier();
+        }
         return new PColumnImpl(columnName, familyName, dataType, maxLength, scale, nullable, position, sortOrder,
-                arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, isDynamic);
+                arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, isDynamic, columnQualifier);
     }
 
     public static PTableProtos.PColumn toProto(PColumn column) {
@@ -283,6 +294,9 @@ public class PColumnImpl implements PColumn {
             builder.setExpression(column.getExpressionStr());
         }
         builder.setIsRowTimestamp(column.isRowTimestamp());
+        if (column.getColumnQualifier() != null) {
+            builder.setColumnQualifier(column.getColumnQualifier());
+        }
         return builder.build();
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
index 413d116..8c85ae5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
@@ -399,7 +399,7 @@ public class PMetaDataImpl implements PMetaData {
             if (familyName == null) {
                 column = table.getPKColumn(columnToRemove.getName().getString());
             } else {
-                column = table.getColumnFamily(familyName).getColumn(columnToRemove.getName().getString());
+                column = table.getColumnFamily(familyName).getPColumnForColumnName(columnToRemove.getName().getString());
             }
             int positionOffset = 0;
             int position = column.getPosition();
@@ -414,7 +414,7 @@ public class PMetaDataImpl implements PMetaData {
             // Update position of columns that follow removed column
             for (int i = position+1; i < oldColumns.size(); i++) {
                 PColumn oldColumn = oldColumns.get(i);
-                PColumn newColumn = new PColumnImpl(oldColumn.getName(), oldColumn.getFamilyName(), oldColumn.getDataType(), oldColumn.getMaxLength(), oldColumn.getScale(), oldColumn.isNullable(), i-1+positionOffset, oldColumn.getSortOrder(), oldColumn.getArraySize(), oldColumn.getViewConstant(), oldColumn.isViewReferenced(), null, oldColumn.isRowTimestamp(), oldColumn.isDynamic());
+                PColumn newColumn = new PColumnImpl(oldColumn.getName(), oldColumn.getFamilyName(), oldColumn.getDataType(), oldColumn.getMaxLength(), oldColumn.getScale(), oldColumn.isNullable(), i-1+positionOffset, oldColumn.getSortOrder(), oldColumn.getArraySize(), oldColumn.getViewConstant(), oldColumn.isViewReferenced(), null, oldColumn.isRowTimestamp(), oldColumn.isDynamic(), oldColumn.getColumnQualifier());
                 columns.add(newColumn);
             }
             

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/schema/PName.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PName.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PName.java
index 0e1337c..8df6a95 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PName.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PName.java
@@ -83,6 +83,32 @@ public interface PName {
             return 0;
         }
     };
+    public static PName ENCODED_EMPTY_COLUMN_NAME = new PName() {
+        @Override
+        public String getString() {
+            return String.valueOf(QueryConstants.ENCODED_EMPTY_COLUMN_NAME);
+        }
+
+        @Override
+        public byte[] getBytes() {
+            return QueryConstants.ENCODED_EMPTY_COLUMN_BYTES;
+        }
+        
+        @Override
+        public String toString() {
+            return getString();
+        }
+
+        @Override
+        public ImmutableBytesPtr getBytesPtr() {
+            return QueryConstants.ENCODED_EMPTY_COLUMN_BYTES_PTR;
+        }
+
+        @Override
+        public int getEstimatedSize() {
+            return 0;
+        }
+    };
     /**
      * Get the client-side, normalized name as referenced
      * in a SQL statement.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index b2a1d58..662f114 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -154,6 +154,34 @@ public interface PTable extends PMetaDataEntity {
             return LinkType.values()[serializedValue-1];
         }
     }
+    
+    public enum StorageScheme {
+        ENCODED_COLUMN_NAMES((byte)1),
+        NON_ENCODED_COLUMN_NAMES((byte)2);
+
+        private final byte[] byteValue;
+        private final byte serializedValue;
+
+        StorageScheme(byte serializedValue) {
+            this.serializedValue = serializedValue;
+            this.byteValue = Bytes.toBytes(this.name());
+        }
+
+        public byte[] getBytes() {
+            return byteValue;
+        }
+
+        public byte getSerializedValue() {
+            return this.serializedValue;
+        }
+
+        public static StorageScheme fromSerializedValue(byte serializedValue) {
+            if (serializedValue < 1 || serializedValue > StorageScheme.values().length) {
+                return null;
+            }
+            return StorageScheme.values()[serializedValue-1];
+        }
+    }
 
     long getTimeStamp();
     long getSequenceNumber();
@@ -209,7 +237,17 @@ public interface PTable extends PMetaDataEntity {
      * can be found
      * @throws AmbiguousColumnException if multiple columns are found with the given name
      */
-    PColumn getColumn(String name) throws ColumnNotFoundException, AmbiguousColumnException;
+    //TODO: samarth inspect all the callers of this method.
+    PColumn getPColumnForColumnName(String name) throws ColumnNotFoundException, AmbiguousColumnException;
+    
+    /**
+     * Get the column with the given column qualifier.
+     * @param column qualifier bytes
+     * @return the PColumn with the given column qualifier
+     * @throws ColumnNotFoundException if no column with the given column qualifier can be found
+     * @throws AmbiguousColumnException if multiple columns are found with the given column qualifier
+     */
+    PColumn getPColumnForColumnQualifier(byte[] cq) throws ColumnNotFoundException, AmbiguousColumnException; 
     
     /**
      * Get the PK column with the given name.
@@ -341,4 +379,5 @@ public interface PTable extends PMetaDataEntity {
      */
     int getRowTimestampColPos();
     long getUpdateCacheFrequency();
+    StorageScheme getStorageScheme();
 }


[3/3] phoenix git commit: Replace empty key value with unit tests passing

Posted by sa...@apache.org.
Replace empty key value with unit tests passing


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/42294c4b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/42294c4b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/42294c4b

Branch: refs/heads/encodecolumns
Commit: 42294c4ba60da17d22e95bae6b4013871749b025
Parents: be5191b
Author: Samarth <sa...@salesforce.com>
Authored: Mon Mar 21 13:52:53 2016 -0700
Committer: Samarth <sa...@salesforce.com>
Committed: Mon Mar 21 13:52:53 2016 -0700

----------------------------------------------------------------------
 .../apache/phoenix/end2end/AlterTableIT.java    |  22 +-
 .../apache/phoenix/end2end/UpsertValuesIT.java  |  42 ++++
 .../phoenix/end2end/index/IndexTestUtil.java    |  12 +-
 .../phoenix/compile/CreateTableCompiler.java    |   2 +-
 .../phoenix/compile/ExpressionCompiler.java     |   2 +-
 .../apache/phoenix/compile/FromCompiler.java    |  28 ++-
 .../apache/phoenix/compile/JoinCompiler.java    |   7 +-
 .../phoenix/compile/ListJarsQueryPlan.java      |   2 +-
 .../apache/phoenix/compile/PostDDLCompiler.java |  11 +-
 .../phoenix/compile/ProjectionCompiler.java     |   9 +-
 .../apache/phoenix/compile/TraceQueryPlan.java  |   2 +-
 .../compile/TupleProjectionCompiler.java        |   6 +-
 .../apache/phoenix/compile/UnionCompiler.java   |   5 +-
 .../apache/phoenix/compile/WhereCompiler.java   |   6 +-
 .../coprocessor/BaseScannerRegionObserver.java  |   3 +-
 .../coprocessor/MetaDataEndpointImpl.java       |  41 +++-
 .../coprocessor/SequenceRegionObserver.java     |   1 +
 .../UngroupedAggregateRegionObserver.java       |   8 +-
 .../coprocessor/generated/PTableProtos.java     | 242 ++++++++++++++++---
 .../apache/phoenix/execute/MutationState.java   |   2 +-
 .../expression/KeyValueColumnExpression.java    |  13 +-
 .../expression/ProjectedColumnExpression.java   |   1 +
 .../phoenix/filter/ColumnProjectionFilter.java  |  24 +-
 .../filter/MultiKeyValueComparisonFilter.java   |   2 +-
 .../filter/SingleKeyValueComparisonFilter.java  |   2 +-
 .../apache/phoenix/index/IndexMaintainer.java   |  42 ++--
 .../index/PhoenixTransactionalIndexer.java      |  12 +-
 .../phoenix/iterate/BaseResultIterators.java    |   5 +-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |   4 +
 .../phoenix/mapreduce/AbstractBulkLoadTool.java |   2 +-
 .../mapreduce/FormatToBytesWritableMapper.java  |  44 ++--
 .../mapreduce/FormatToKeyValueReducer.java      |  10 +-
 .../query/ConnectionQueryServicesImpl.java      |   1 +
 .../apache/phoenix/query/QueryConstants.java    |  14 +-
 .../org/apache/phoenix/schema/ColumnRef.java    |   4 +-
 .../apache/phoenix/schema/DelegateColumn.java   |   5 +
 .../apache/phoenix/schema/DelegateTable.java    |  14 +-
 .../apache/phoenix/schema/MetaDataClient.java   | 100 ++++++--
 .../java/org/apache/phoenix/schema/PColumn.java |   7 +-
 .../apache/phoenix/schema/PColumnFamily.java    |  14 +-
 .../phoenix/schema/PColumnFamilyImpl.java       |  47 ++--
 .../org/apache/phoenix/schema/PColumnImpl.java  |  24 +-
 .../apache/phoenix/schema/PMetaDataImpl.java    |   4 +-
 .../java/org/apache/phoenix/schema/PName.java   |  26 ++
 .../java/org/apache/phoenix/schema/PTable.java  |  41 +++-
 .../org/apache/phoenix/schema/PTableImpl.java   | 136 ++++++++---
 .../apache/phoenix/schema/ProjectedColumn.java  |   1 +
 .../org/apache/phoenix/schema/SaltingUtil.java  |   2 +-
 .../java/org/apache/phoenix/util/IndexUtil.java |   7 +-
 .../org/apache/phoenix/util/PhoenixRuntime.java |   8 +-
 .../org/apache/phoenix/util/SchemaUtil.java     |  59 ++++-
 .../phoenix/compile/HavingCompilerTest.java     |   2 +-
 .../phoenix/compile/QueryCompilerTest.java      |   2 +-
 .../phoenix/compile/WhereCompilerTest.java      |  26 +-
 .../phoenix/execute/CorrelatePlanTest.java      |   4 +-
 .../phoenix/execute/UnnestArrayPlanTest.java    |   4 +-
 .../expression/ColumnExpressionTest.java        |  16 +-
 .../phoenix/index/IndexMaintainerTest.java      |   3 +-
 .../iterate/AggregateResultScannerTest.java     |   6 +-
 .../query/BaseConnectionlessQueryTest.java      |  18 +-
 .../phoenix/query/ConnectionlessTest.java       |  15 +-
 .../java/org/apache/phoenix/util/TestUtil.java  |   4 +-
 phoenix-protocol/src/main/PTable.proto          |   2 +
 63 files changed, 933 insertions(+), 297 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
index 122ec16..a1313dc 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
@@ -36,7 +36,6 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -45,7 +44,9 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeepDeletedCells;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -56,7 +57,6 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.types.PInteger;
-import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
@@ -65,8 +65,6 @@ import org.apache.phoenix.util.SchemaUtil;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import com.google.common.collect.Lists;
-
 /**
  *
  * A lot of tests in this class test HBase level properties. As a result,
@@ -2123,23 +2121,23 @@ public class AlterTableIT extends BaseOwnClusterHBaseManagedTimeIT {
             PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class); 
             PTable table = phxConn.getTable(new PTableKey(phxConn.getTenantId(), "T1"));
             // Assert that the column shows up as row time stamp in the cache.
-            assertTrue(table.getColumn("PK1").isRowTimestamp());
-            assertFalse(table.getColumn("PK2").isRowTimestamp());
+            assertTrue(table.getPColumnForColumnName("PK1").isRowTimestamp());
+            assertFalse(table.getPColumnForColumnName("PK2").isRowTimestamp());
             assertIsRowTimestampSet("T1", "PK1");
             
             conn.createStatement().execute("CREATE TABLE T6 (PK1 VARCHAR, PK2 DATE PRIMARY KEY ROW_TIMESTAMP, KV1 VARCHAR, KV2 INTEGER)");
             table = phxConn.getTable(new PTableKey(phxConn.getTenantId(), "T6"));
             // Assert that the column shows up as row time stamp in the cache.
-            assertFalse(table.getColumn("PK1").isRowTimestamp());
-            assertTrue(table.getColumn("PK2").isRowTimestamp());
+            assertFalse(table.getPColumnForColumnName("PK1").isRowTimestamp());
+            assertTrue(table.getPColumnForColumnName("PK2").isRowTimestamp());
             assertIsRowTimestampSet("T6", "PK2");
             
             // Create an index on a table has a row time stamp pk column. The column should show up as a row time stamp column for the index too. 
             conn.createStatement().execute("CREATE INDEX T6_IDX ON T6 (KV1) include (KV2)");
             PTable indexTable = phxConn.getTable(new PTableKey(phxConn.getTenantId(), "T6_IDX"));
-            String indexColName = IndexUtil.getIndexColumnName(table.getColumn("PK2"));
+            String indexColName = IndexUtil.getIndexColumnName(table.getPColumnForColumnName("PK2"));
             // Assert that the column shows up as row time stamp in the cache.
-            assertTrue(indexTable.getColumn(indexColName).isRowTimestamp());
+            assertTrue(indexTable.getPColumnForColumnName(indexColName).isRowTimestamp());
             assertIsRowTimestampSet("T6_IDX", indexColName);
             
             // Creating a view with a row_timestamp column in its pk constraint is not allowed
@@ -2204,6 +2202,6 @@ public class AlterTableIT extends BaseOwnClusterHBaseManagedTimeIT {
 			}
 		}
 	}
-    
+	
 }
  
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
index 78dce80..3fec718 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
@@ -37,12 +37,20 @@ import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.Properties;
 
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.Test;
 
@@ -922,6 +930,40 @@ public class UpsertValuesIT extends BaseClientManagedTimeIT {
         }
     }
     
+    @Test
+    public void testColumnQualifierForUpsertedValues() throws Exception {
+        String schemaName = "A";
+        String tableName = "TEST";
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String ddl = "create table " + fullTableName 
+                + " (" 
+                + " K varchar primary key,"
+                + " CF1.V1 varchar, CF2.V2 VARCHAR, CF2.V3 VARCHAR)";
+        try (Connection conn = getConnection(nextTimestamp())) {
+            conn.createStatement().execute(ddl);
+        }
+        String dml = "UPSERT INTO " + fullTableName + " VALUES (?, ?, ?, ?)";
+        try (Connection conn = getConnection(nextTimestamp())) {
+            PreparedStatement stmt = conn.prepareStatement(dml);
+            stmt.setString(1, "KEY1");
+            stmt.setString(2, "VALUE1");
+            stmt.setString(3, "VALUE2");
+            stmt.setString(4, "VALUE3");
+            stmt.executeUpdate();
+            conn.commit();
+        }
+        // Issue a raw hbase scan and assert that key values have the expected column qualifiers.
+        try (Connection conn = getConnection(nextTimestamp())) {
+            HTableInterface table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName));
+            ResultScanner scanner = table.getScanner(new Scan());
+            Result next = scanner.next();
+            assertTrue(next.containsColumn(Bytes.toBytes("CF1"), PInteger.INSTANCE.toBytes(1)));
+            assertTrue(next.containsColumn(Bytes.toBytes("CF2"), PInteger.INSTANCE.toBytes(1)));
+            assertTrue(next.containsColumn(Bytes.toBytes("CF2"), PInteger.INSTANCE.toBytes(2)));
+        }
+    }
+    
+    
     private static Connection getConnection(long ts) throws SQLException {
         Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java
index ba04ad7..42c0264 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java
@@ -39,15 +39,14 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
-import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnFamily;
-import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.PRow;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -115,7 +114,7 @@ public class IndexTestUtil {
         while ((hasValue = dataRowKeySchema.next(ptr, i, maxOffset)) != null) {
             if (hasValue) {
                 PColumn dataColumn = dataPKColumns.get(i);
-                PColumn indexColumn = indexTable.getColumn(IndexUtil.getIndexColumnName(dataColumn));
+                PColumn indexColumn = indexTable.getPColumnForColumnName(IndexUtil.getIndexColumnName(dataColumn));
                 coerceDataValueToIndexValue(dataColumn, indexColumn, ptr);
                 indexValues[indexColumn.getPosition()-indexOffset] = ptr.copyBytes();
             }
@@ -135,10 +134,11 @@ public class IndexTestUtil {
                     for (Cell kv : entry.getValue()) {
                         @SuppressWarnings("deprecation")
                         byte[] cq = kv.getQualifier();
-                        if (Bytes.compareTo(QueryConstants.EMPTY_COLUMN_BYTES, cq) != 0) {
+                        byte[] emptyKVQualifier = SchemaUtil.getEmptyKeyValueInfo(dataTable).getFirst();
+                        if (Bytes.compareTo(emptyKVQualifier, cq) != 0) {
                             try {
-                                PColumn dataColumn = family.getColumn(cq);
-                                PColumn indexColumn = indexTable.getColumn(IndexUtil.getIndexColumnName(family.getName().getString(), dataColumn.getName().getString()));
+                                PColumn dataColumn = family.getPColumnForColumnQualifier(cq);
+                                PColumn indexColumn = indexTable.getPColumnForColumnName(IndexUtil.getIndexColumnName(family.getName().getString(), dataColumn.getName().getString()));
                                 ptr.set(kv.getValueArray(),kv.getValueOffset(),kv.getValueLength());
                                 coerceDataValueToIndexValue(dataColumn, indexColumn, ptr);
                                 indexValues[indexPKColumns.indexOf(indexColumn)-indexOffset] = ptr.copyBytes();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
index e032feb..03f679b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
@@ -290,7 +290,7 @@ public class CreateTableCompiler {
         @Override
         public Boolean visit(KeyValueColumnExpression node) {
             try {
-                this.position = table.getColumnFamily(node.getColumnFamily()).getColumn(node.getColumnName()).getPosition();
+                this.position = table.getColumnFamily(node.getColumnFamily()).getPColumnForColumnQualifier(node.getColumnQualifier()).getPosition();
             } catch (SQLException e) {
                 throw new RuntimeException(e); // Impossible
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
index 1278494..31a2f82 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
@@ -407,7 +407,7 @@ public class ExpressionCompiler extends UnsupportedAllParseNodeVisitor<Expressio
             return LiteralExpression.newConstant(column.getDataType().toObject(ptr), column.getDataType());
         }
         if (tableRef.equals(context.getCurrentTable()) && !SchemaUtil.isPKColumn(column)) { // project only kv columns
-            context.getScan().addColumn(column.getFamilyName().getBytes(), column.getName().getBytes());
+            context.getScan().addColumn(column.getFamilyName().getBytes(), SchemaUtil.getColumnQualifier(column, tableRef.getTable()));
         }
         Expression expression = ref.newColumnExpression(node.isTableNameCaseSensitive(), node.isCaseSensitive());
         Expression wrappedExpression = wrapGroupByExpression(expression);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index ffe9621..8462812 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -122,10 +122,12 @@ public class FromCompiler {
             throw new ColumnNotFoundException(schemaName, tableName, null, colName);
         }
         
+        @Override
         public PFunction resolveFunction(String functionName) throws SQLException {
             throw new FunctionNotFoundException(functionName);
         };
 
+        @Override
         public boolean hasUDFs() {
             return false;
         };
@@ -158,7 +160,7 @@ public class FromCompiler {
                     if (htable != null) Closeables.closeQuietly(htable);
                 }
                 tableNode = NamedTableNode.create(null, baseTable, statement.getColumnDefs());
-                return new SingleTableColumnResolver(connection, tableNode, e.getTimeStamp(), new HashMap<String, UDFParseNode>(1));
+                return new SingleTableColumnResolver(connection, tableNode, e.getTimeStamp(), new HashMap<String, UDFParseNode>(1), false);
             }
             throw e;
         }
@@ -221,7 +223,7 @@ public class FromCompiler {
             Expression sourceExpression = projector.getColumnProjector(column.getPosition()).getExpression();
             PColumnImpl projectedColumn = new PColumnImpl(column.getName(), column.getFamilyName(),
                     sourceExpression.getDataType(), sourceExpression.getMaxLength(), sourceExpression.getScale(), sourceExpression.isNullable(),
-                    column.getPosition(), sourceExpression.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp(), column.isDynamic());
+                    column.getPosition(), sourceExpression.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp(), column.isDynamic(), column.getColumnQualifier());
             projectedColumns.add(projectedColumn);
         }
         PTable t = PTableImpl.makePTable(table, projectedColumns);
@@ -258,12 +260,12 @@ public class FromCompiler {
     	private final List<TableRef> tableRefs;
     	private final String alias;
 
-       public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode table, long timeStamp, Map<String, UDFParseNode> udfParseNodes) throws SQLException  {
+       public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode table, long timeStamp, Map<String, UDFParseNode> udfParseNodes, boolean useEncodedColumnNames) throws SQLException  {
            super(connection, 0, false, udfParseNodes);
            List<PColumnFamily> families = Lists.newArrayListWithExpectedSize(table.getDynamicColumns().size());
            for (ColumnDef def : table.getDynamicColumns()) {
                if (def.getColumnDefName().getFamilyName() != null) {
-                   families.add(new PColumnFamilyImpl(PNameFactory.newName(def.getColumnDefName().getFamilyName()),Collections.<PColumn>emptyList()));
+                   families.add(new PColumnFamilyImpl(PNameFactory.newName(def.getColumnDefName().getFamilyName()),Collections.<PColumn>emptyList(), useEncodedColumnNames));
                }
            }
            Long scn = connection.getSCN();
@@ -360,8 +362,8 @@ public class FromCompiler {
 
 			}
         	PColumn column = resolveCF
-        	        ? tableRef.getTable().getColumnFamily(tableName).getColumn(colName)
-        			: tableRef.getTable().getColumn(colName);
+        	        ? tableRef.getTable().getColumnFamily(tableName).getPColumnForColumnName(colName)
+        			: tableRef.getTable().getPColumnForColumnName(colName);
             return new ColumnRef(tableRef, column.getPosition());
 		}
     }
@@ -546,8 +548,9 @@ public class FromCompiler {
                         theTable.getColumnFamily(family); // Verifies that column family exists
                         familyName = PNameFactory.newName(family);
                     }
+                    // Dynamic columns don't have a corresponding column qualifier
                     allcolumns.add(new PColumnImpl(name, familyName, dynColumn.getDataType(), dynColumn.getMaxLength(),
-                            dynColumn.getScale(), dynColumn.isNull(), position, dynColumn.getSortOrder(), dynColumn.getArraySize(), null, false, dynColumn.getExpression(), false, true));
+                            dynColumn.getScale(), dynColumn.isNull(), position, dynColumn.getSortOrder(), dynColumn.getArraySize(), null, false, dynColumn.getExpression(), false, true, null));
                     position++;
                 }
                 theTable = PTableImpl.makePTable(theTable, allcolumns);
@@ -643,16 +646,17 @@ public class FromCompiler {
                     // referenced by an outer wild-card select.
                     alias = String.valueOf(position);
                 }
+                //TODO: samarth confirm this is the right change i.e. using null for column qualifier
                 PColumnImpl column = new PColumnImpl(PNameFactory.newName(alias),
                         PNameFactory.newName(QueryConstants.DEFAULT_COLUMN_FAMILY),
-                        null, 0, 0, true, position++, SortOrder.ASC, null, null, false, null, false, false);
+                        null, 0, 0, true, position++, SortOrder.ASC, null, null, false, null, false, false, null);
                 columns.add(column);
             }
             PTable t = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME,
                     PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM,
                     null, null, columns, null, null, Collections.<PTable>emptyList(),
                     false, Collections.<PName>emptyList(), null, null, false, false, false, null,
-                    null, null, false, false, 0, 0L);
+                    null, null, false, false, 0, 0L, null);
 
             String alias = subselectNode.getAlias();
             TableRef tableRef = new TableRef(alias, t, MetaDataProtocol.MIN_TABLE_TIMESTAMP, false);
@@ -727,7 +731,7 @@ public class FromCompiler {
                 while (iterator.hasNext()) {
                     TableRef tableRef = iterator.next();
                     try {
-                        PColumn column = tableRef.getTable().getColumn(colName);
+                        PColumn column = tableRef.getTable().getPColumnForColumnName(colName);
                         if (theTableRef != null) { throw new AmbiguousColumnException(colName); }
                         theTableRef = tableRef;
                         theColumnPosition = column.getPosition();
@@ -740,12 +744,12 @@ public class FromCompiler {
             } else {
                 try {
                     TableRef tableRef = resolveTable(schemaName, tableName);
-                    PColumn column = tableRef.getTable().getColumn(colName);
+                    PColumn column = tableRef.getTable().getPColumnForColumnName(colName);
                     return new ColumnRef(tableRef, column.getPosition());
                 } catch (TableNotFoundException e) {
                     // Try using the tableName as a columnFamily reference instead
                     ColumnFamilyRef cfRef = resolveColumnFamily(schemaName, tableName);
-                    PColumn column = cfRef.getFamily().getColumn(colName);
+                    PColumn column = cfRef.getFamily().getPColumnForColumnName(colName);
                     return new ColumnRef(cfRef.getTableRef(), column.getPosition());
                 }
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index 5d03f57..5f4adf6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -74,6 +74,7 @@ import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTable.StorageScheme;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.ProjectedColumn;
@@ -714,7 +715,7 @@ public class JoinCompiler {
                 if (columnRef.getTableRef().equals(tableRef)
                         && !SchemaUtil.isPKColumn(columnRef.getColumn())
                         && !(columnRef instanceof LocalIndexColumnRef)) {
-                    scan.addColumn(columnRef.getColumn().getFamilyName().getBytes(), columnRef.getColumn().getName().getBytes());
+                    scan.addColumn(columnRef.getColumn().getFamilyName().getBytes(), SchemaUtil.getColumnQualifier(columnRef.getColumn(), tableRef.getTable()));
                 }
             }
         }
@@ -1295,14 +1296,14 @@ public class JoinCompiler {
         if (left.getBucketNum() != null) {
             merged.remove(0);
         }
-        
+        //TODO: samarth should projected join table should always have non-encoded column names?
         return PTableImpl.makePTable(left.getTenantId(), left.getSchemaName(),
                 PNameFactory.newName(SchemaUtil.getTableName(left.getName().getString(), right.getName().getString())),
                 left.getType(), left.getIndexState(), left.getTimeStamp(), left.getSequenceNumber(), left.getPKName(), 
                 left.getBucketNum(), merged,left.getParentSchemaName(), left.getParentTableName(), left.getIndexes(),
                 left.isImmutableRows(), Collections.<PName>emptyList(), null, null, PTable.DEFAULT_DISABLE_WAL,
                 left.isMultiTenant(), left.getStoreNulls(), left.getViewType(), left.getViewIndexId(), left.getIndexType(),
-                left.rowKeyOrderOptimizable(), left.isTransactional(), left.getUpdateCacheFrequency(), left.getIndexDisableTimestamp());
+                left.rowKeyOrderOptimizable(), left.isTransactional(), left.getUpdateCacheFrequency(), left.getIndexDisableTimestamp(), StorageScheme.NON_ENCODED_COLUMN_NAMES);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
index 7f3277a..2a7c507 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
@@ -80,7 +80,7 @@ public class ListJarsQueryPlan implements QueryPlan {
         PColumn column =
                 new PColumnImpl(PNameFactory.newName("jar_location"), null,
                         PVarchar.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null,
-                        false, null, false, false);
+                        false, null, false, false, null);
         List<PColumn> columns = new ArrayList<PColumn>();
         columns.add(column);
         Expression expression =

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index 2659b3f..10087af 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -49,6 +49,7 @@ import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TransactionUtil;
 
 import com.google.common.collect.Lists;
@@ -164,8 +165,8 @@ public class PostDDLCompiler {
                             @Override
                             public ColumnRef resolveColumn(String schemaName, String tableName, String colName) throws SQLException {
                                 PColumn column = tableName != null
-                                        ? tableRef.getTable().getColumnFamily(tableName).getColumn(colName)
-                                        : tableRef.getTable().getColumn(colName);
+                                        ? tableRef.getTable().getColumnFamily(tableName).getPColumnForColumnName(colName)
+                                        : tableRef.getTable().getPColumnForColumnName(colName);
                                 return new ColumnRef(tableRef, column.getPosition());
                             }
                             
@@ -191,6 +192,7 @@ public class PostDDLCompiler {
                         ScanUtil.setTimeRange(scan, ts);
                         if (emptyCF != null) {
                             scan.setAttribute(BaseScannerRegionObserver.EMPTY_CF, emptyCF);
+                            scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER, SchemaUtil.getEmptyKeyValueInfo(tableRef.getTable()).getFirst());
                         }
                         ServerCache cache = null;
                         try {
@@ -214,11 +216,12 @@ public class PostDDLCompiler {
                                     // data empty column family to stay the same, while the index empty column family
                                     // changes.
                                     PColumn column = deleteList.get(0);
+                                    byte[] cq = SchemaUtil.getColumnQualifier(column, tableRef.getTable());
                                     if (emptyCF == null) {
-                                        scan.addColumn(column.getFamilyName().getBytes(), column.getName().getBytes());
+                                        scan.addColumn(column.getFamilyName().getBytes(), cq);
                                     }
                                     scan.setAttribute(BaseScannerRegionObserver.DELETE_CF, column.getFamilyName().getBytes());
-                                    scan.setAttribute(BaseScannerRegionObserver.DELETE_CQ, column.getName().getBytes());
+                                    scan.setAttribute(BaseScannerRegionObserver.DELETE_CQ, cq);
                                 }
                             }
                             List<byte[]> columnFamilies = Lists.newArrayListWithExpectedSize(tableRef.getTable().getColumnFamilies().size());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
index 3cf3934..b6a6771 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
@@ -217,7 +217,7 @@ public class ProjectionCompiler {
             PColumn indexColumn = null;
             ColumnRef ref = null;
             try {
-                indexColumn = index.getColumn(indexColName);
+                indexColumn = index.getPColumnForColumnName(indexColName);
                 ref = new ColumnRef(tableRef, indexColumn.getPosition());
             } catch (ColumnNotFoundException e) {
                 if (index.getIndexType() == IndexType.LOCAL) {
@@ -288,7 +288,7 @@ public class ProjectionCompiler {
             PColumn indexColumn = null;
             ColumnRef ref = null;
             try {
-                indexColumn = index.getColumn(indexColName);
+                indexColumn = index.getPColumnForColumnName(indexColName);
                 ref = new ColumnRef(tableRef, indexColumn.getPosition());
             } catch (ColumnNotFoundException e) {
                 if (index.getIndexType() == IndexType.LOCAL) {
@@ -476,7 +476,7 @@ public class ProjectionCompiler {
                 }
             } else {
                 for (byte[] cq : entry.getValue()) {
-                    PColumn column = family.getColumn(cq);
+                    PColumn column = family.getPColumnForColumnQualifier(cq);
                     Integer maxLength = column.getMaxLength();
                     int byteSize = column.getDataType().isFixedWidth() ? maxLength == null ? column.getDataType().getByteSize() : maxLength : RowKeySchema.ESTIMATED_VARIABLE_LENGTH_SIZE;
                     estimatedByteSize += SizedUtil.KEY_VALUE_SIZE + estimatedKeySize + byteSize;
@@ -702,7 +702,8 @@ public class ProjectionCompiler {
                      public Void visit(ProjectedColumnExpression expression) {
                          if (expression.getDataType().isArrayType()) {
                              indexProjectedColumns.add(expression);
-                             KeyValueColumnExpression keyValueColumnExpression = new KeyValueColumnExpression(expression.getColumn());
+                             //TODO: samarth confirm this change that column names 
+                             KeyValueColumnExpression keyValueColumnExpression = new KeyValueColumnExpression(expression.getColumn(), false);
                              indexKVs.add(keyValueColumnExpression);
                              copyOfChildren.set(0, keyValueColumnExpression);
                              Integer count = arrayExpressionCounts.get(expression);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index 58cdb64..de63249 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -78,7 +78,7 @@ public class TraceQueryPlan implements QueryPlan {
         PColumn column =
                 new PColumnImpl(PNameFactory.newName(MetricInfo.TRACE.columnName), null,
                         PLong.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null,
-                        false, null, false, false);
+                        false, null, false, false, null);
         List<PColumn> columns = new ArrayList<PColumn>();
         columns.add(column);
         Expression expression =

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
index 4be78a9..0933e34 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
@@ -136,7 +136,7 @@ public class TupleProjectionCompiler {
             projectedColumns.add(column);
             // Wildcard or FamilyWildcard will be handled by ProjectionCompiler.
             if (!isWildcard && !families.contains(sourceColumn.getFamilyName())) {
-                context.getScan().addColumn(sourceColumn.getFamilyName().getBytes(), sourceColumn.getName().getBytes());
+                context.getScan().addColumn(sourceColumn.getFamilyName().getBytes(), SchemaUtil.getColumnQualifier(column, table));
             }
         }
         // add LocalIndexDataColumnRef
@@ -152,7 +152,7 @@ public class TupleProjectionCompiler {
                 table.getBucketNum(), projectedColumns, table.getParentSchemaName(),
                 table.getParentName(), table.getIndexes(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null,
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(),
-                table.getIndexType(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp());
+                table.getIndexType(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.getStorageScheme());
     }
 
     public static PTable createProjectedTable(TableRef tableRef, List<ColumnRef> sourceColumnRefs, boolean retainPKColumns) throws SQLException {
@@ -179,7 +179,7 @@ public class TupleProjectionCompiler {
                     retainPKColumns ? table.getBucketNum() : null, projectedColumns, null,
                     null, Collections.<PTable>emptyList(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null,
                     table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(),
-                    null, table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp());
+                    null, table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.getStorageScheme());
     }
 
     // For extracting column references from single select statement

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
index b25baf7..0331015 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
@@ -73,16 +73,17 @@ public class UnionCompiler {
             ColumnProjector colProj = plan.getProjector().getColumnProjector(i);
             Expression sourceExpression = colProj.getExpression();
             String name = selectNodes == null ? colProj.getName() : selectNodes.get(i).getAlias();
+            //TODO: samarth confirm this is the right change
             PColumnImpl projectedColumn = new PColumnImpl(PNameFactory.newName(name), UNION_FAMILY_NAME,
                     sourceExpression.getDataType(), sourceExpression.getMaxLength(), sourceExpression.getScale(), sourceExpression.isNullable(),
-                    i, sourceExpression.getSortOrder(), 500, null, false, sourceExpression.toString(), false, false);
+                    i, sourceExpression.getSortOrder(), 500, null, false, sourceExpression.toString(), false, false, null);
             projectedColumns.add(projectedColumn);
         }
         Long scn = statement.getConnection().getSCN();
         PTable tempTable = PTableImpl.makePTable(statement.getConnection().getTenantId(), UNION_SCHEMA_NAME, UNION_TABLE_NAME, 
                 PTableType.SUBQUERY, null, HConstants.LATEST_TIMESTAMP, scn == null ? HConstants.LATEST_TIMESTAMP : scn, null, null,
                         projectedColumns, null, null, null,
-                        true, null, null, null, true, true, true, null, null, null, false, false, 0, 0L);
+                        true, null, null, null, true, true, true, null, null, null, false, false, 0, 0L, null);
         TableRef tableRef = new TableRef(null, tempTable, 0, false);
         return tableRef;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
index 13963d7..c2e42f2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
@@ -170,8 +170,8 @@ public class WhereCompiler {
             TableRef tableRef = ref.getTableRef();
             if (tableRef.equals(context.getCurrentTable()) && !SchemaUtil.isPKColumn(ref.getColumn())) {
                 // track the where condition columns. Later we need to ensure the Scan in HRS scans these column CFs
-                context.addWhereCoditionColumn(ref.getColumn().getFamilyName().getBytes(), ref.getColumn().getName()
-                        .getBytes());
+                byte[] cq = SchemaUtil.getColumnQualifier(ref.getColumn(), tableRef.getTable());
+                context.addWhereCoditionColumn(ref.getColumn().getFamilyName().getBytes(), cq);
             }
             return ref.newColumnExpression(node.isTableNameCaseSensitive(), node.isCaseSensitive());
         }
@@ -194,7 +194,7 @@ public class WhereCompiler {
             // just use that.
             try {
                 if (!SchemaUtil.isPKColumn(ref.getColumn())) {
-                    table.getColumn(ref.getColumn().getName().getString());
+                    table.getPColumnForColumnName(ref.getColumn().getName().getString());
                 }
             } catch (AmbiguousColumnException e) {
                 disambiguateWithFamily = true;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 9b440ac..af154fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -77,6 +77,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     public static final String DELETE_CQ = "_DeleteCQ";
     public static final String DELETE_CF = "_DeleteCF";
     public static final String EMPTY_CF = "_EmptyCF";
+    public static final String EMPTY_COLUMN_QUALIFIER = "_EmptyColumnQualifier";
     public static final String SPECIFIC_ARRAY_INDEX = "_SpecificArrayIndex";
     public static final String GROUP_BY_LIMIT = "_GroupByLimit";
     public static final String LOCAL_INDEX = "_LocalIndex";
@@ -399,7 +400,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
                             Cell kv = tuple.getValue(idx);
                             if (Bytes.equals(kvExp.getColumnFamily(), 0, kvExp.getColumnFamily().length,
                                     kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength())
-                                && Bytes.equals(kvExp.getColumnName(), 0, kvExp.getColumnName().length,
+                                && Bytes.equals(kvExp.getColumnQualifier(), 0, kvExp.getColumnQualifier().length,
                                         kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength())) {
                                 // remove the kv that has the full array values.
                                 result.remove(idx);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index bcfdf20..a2db5d6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -19,12 +19,14 @@ package org.apache.phoenix.coprocessor;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
+import static org.apache.phoenix.schema.PTable.StorageScheme;
 import static org.apache.hadoop.hbase.KeyValue.createFirstOnRow;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME_INDEX;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE_BYTES;
@@ -54,6 +56,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORAGE_SCHEME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME_INDEX;
@@ -178,6 +181,7 @@ import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.types.PSmallint;
+import org.apache.phoenix.schema.types.PTinyint;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.trace.util.Tracing;
@@ -246,6 +250,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final KeyValue ROW_KEY_ORDER_OPTIMIZABLE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ROW_KEY_ORDER_OPTIMIZABLE_BYTES);
     private static final KeyValue TRANSACTIONAL_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TRANSACTIONAL_BYTES);
     private static final KeyValue UPDATE_CACHE_FREQUENCY_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, UPDATE_CACHE_FREQUENCY_BYTES);
+    private static final KeyValue STORAGE_SCHEME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, STORAGE_SCHEME_BYTES);
     
     private static final List<KeyValue> TABLE_KV_COLUMNS = Arrays.<KeyValue>asList(
             EMPTY_KEYVALUE_KV,
@@ -269,7 +274,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             BASE_COLUMN_COUNT_KV,
             ROW_KEY_ORDER_OPTIMIZABLE_KV,
             TRANSACTIONAL_KV,
-            UPDATE_CACHE_FREQUENCY_KV
+            UPDATE_CACHE_FREQUENCY_KV,
+            STORAGE_SCHEME_KV
             );
     static {
         Collections.sort(TABLE_KV_COLUMNS, KeyValue.COMPARATOR);
@@ -296,6 +302,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final int TRANSACTIONAL_INDEX = TABLE_KV_COLUMNS.indexOf(TRANSACTIONAL_KV);
     private static final int UPDATE_CACHE_FREQUENCY_INDEX = TABLE_KV_COLUMNS.indexOf(UPDATE_CACHE_FREQUENCY_KV);
     private static final int INDEX_DISABLE_TIMESTAMP = TABLE_KV_COLUMNS.indexOf(INDEX_DISABLE_TIMESTAMP_KV);
+    private static final int STORAGE_SCHEME_INDEX = TABLE_KV_COLUMNS.indexOf(STORAGE_SCHEME_KV);
 
     // KeyValues for Column
     private static final KeyValue DECIMAL_DIGITS_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES);
@@ -309,6 +316,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final KeyValue IS_VIEW_REFERENCED_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_VIEW_REFERENCED_BYTES);
     private static final KeyValue COLUMN_DEF_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_DEF_BYTES);
     private static final KeyValue IS_ROW_TIMESTAMP_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, IS_ROW_TIMESTAMP_BYTES);
+    private static final KeyValue COLUMN_QUALIFIER_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_QUALIFIER_BYTES);
     private static final List<KeyValue> COLUMN_KV_COLUMNS = Arrays.<KeyValue>asList(
             DECIMAL_DIGITS_KV,
             COLUMN_SIZE_KV,
@@ -321,7 +329,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             VIEW_CONSTANT_KV,
             IS_VIEW_REFERENCED_KV,
             COLUMN_DEF_KV,
-            IS_ROW_TIMESTAMP_KV
+            IS_ROW_TIMESTAMP_KV,
+            COLUMN_QUALIFIER_KV
             );
     static {
         Collections.sort(COLUMN_KV_COLUMNS, KeyValue.COMPARATOR);
@@ -337,6 +346,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final int IS_VIEW_REFERENCED_INDEX = COLUMN_KV_COLUMNS.indexOf(IS_VIEW_REFERENCED_KV);
     private static final int COLUMN_DEF_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_DEF_KV);
     private static final int IS_ROW_TIMESTAMP_INDEX = COLUMN_KV_COLUMNS.indexOf(IS_ROW_TIMESTAMP_KV);
+    private static final int COLUMN_QUALIFIER_INDEX = COLUMN_KV_COLUMNS.indexOf(COLUMN_QUALIFIER_KV);
     
     private static final int LINK_TYPE_INDEX = 0;
 
@@ -642,8 +652,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 isRowTimestampKV == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(
                         isRowTimestampKV.getValueArray(), isRowTimestampKV.getValueOffset(),
                         isRowTimestampKV.getValueLength()));
+        Cell columnQualifierKV = colKeyValues[COLUMN_QUALIFIER_INDEX];
+        Integer columnQualifier = columnQualifierKV == null ? null :
+            PInteger.INSTANCE.getCodec().decodeInt(columnQualifierKV.getValueArray(), columnQualifierKV.getValueOffset(), SortOrder.getDefault());
 
-        PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, false);
+        PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, false, columnQualifier);
         columns.add(column);
     }
     
@@ -840,7 +853,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         Cell indexDisableTimestampKv = tableKeyValues[INDEX_DISABLE_TIMESTAMP];
         long indexDisableTimestamp = indexDisableTimestampKv == null ? 0L : PLong.INSTANCE.getCodec().decodeLong(indexDisableTimestampKv.getValueArray(),
                 indexDisableTimestampKv.getValueOffset(), SortOrder.getDefault());
-        
+        Cell storageSchemeKv = tableKeyValues[STORAGE_SCHEME_INDEX];
+        byte v = (byte)PTinyint.INSTANCE.toObject(storageSchemeKv.getValueArray(), storageSchemeKv.getValueOffset(), storageSchemeKv.getValueLength());
+        StorageScheme storageScheme = storageSchemeKv == null ? null : StorageScheme.fromSerializedValue(v);
         List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnCount);
         List<PTable> indexes = new ArrayList<PTable>();
         List<PName> physicalTables = new ArrayList<PName>();
@@ -885,7 +900,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             tableSeqNum, pkName, saltBucketNum, columns, tableType == INDEX ? schemaName : null,
             tableType == INDEX ? dataTableName : null, indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement,
             disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency,
-            stats, baseColumnCount, indexDisableTimestamp);
+            stats, baseColumnCount, indexDisableTimestamp, storageScheme);
     }
 
     private PFunction getFunction(RegionScanner scanner, final boolean isReplace, long clientTimeStamp, List<Mutation> deleteMutationsForReplace)
@@ -1901,8 +1916,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 String columnName = Bytes.toString(rkmd[COLUMN_NAME_INDEX]);
                 String columnFamily = rkmd[FAMILY_NAME_INDEX] == null ? null : Bytes.toString(rkmd[FAMILY_NAME_INDEX]);
                 try {
-                    existingViewColumn = columnFamily == null ? view.getColumn(columnName) : view.getColumnFamily(
-                            columnFamily).getColumn(columnName);
+                    existingViewColumn = columnFamily == null ? view.getPColumnForColumnName(columnName) : view.getColumnFamily(
+                            columnFamily).getPColumnForColumnName(columnName);
                 } catch (ColumnFamilyNotFoundException e) {
                     // ignore since it means that the column family is not present for the column to be added.
                 } catch (ColumnNotFoundException e) {
@@ -2352,7 +2367,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                                         && rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX].length > 0) {
                                     PColumnFamily family =
                                             table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
-                                    family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
+                                    family.getPColumnForColumnNameBytes(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
                                 } else if (pkCount > COLUMN_NAME_INDEX
                                         && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
                                     addingPKColumn = true;
@@ -2600,7 +2615,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                                         PColumnFamily family =
                                                 table.getColumnFamily(rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
                                         columnToDelete =
-                                                family.getColumn(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
+                                                family.getPColumnForColumnNameBytes(rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX]);
                                     } else if (pkCount > COLUMN_NAME_INDEX
                                             && rowKeyMetaData[PhoenixDatabaseMetaData.COLUMN_NAME_INDEX].length > 0) {
                                         deletePKColumn = true;
@@ -2644,8 +2659,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                                             byte[] indexKey =
                                                     SchemaUtil.getTableKey(tenantId, index
                                                             .getSchemaName().getBytes(), index.getTableName().getBytes());
+                                            byte[] cq = SchemaUtil.getColumnQualifier(columnToDelete, index);
                                             // If index requires this column for its pk, then drop it
-                                            if (indexColumns.contains(new ColumnReference(columnToDelete.getFamilyName().getBytes(), columnToDelete.getName().getBytes()))) {
+                                            if (indexColumns.contains(new ColumnReference(columnToDelete.getFamilyName().getBytes(), cq))) {
                                                 // Since we're dropping the index, lock it to ensure
                                                 // that a change in index state doesn't
                                                 // occur while we're dropping it.
@@ -2858,7 +2874,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                         tableMetadata = new ArrayList<Mutation>(tableMetadata);
                         // insert an empty KV to trigger time stamp update on data table row
                         Put p = new Put(dataTableKey);
-                        p.add(TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timeStamp, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+                        // Decide on what column qualifier to use for empty key value.
+                        PTable currentTable = doGetTable(key, HConstants.LATEST_TIMESTAMP, rowLock);
+                        Pair<byte[], byte[]> emptyKeyValuePair = SchemaUtil.getEmptyKeyValueInfo(currentTable);
+                        p.add(TABLE_FAMILY_BYTES, emptyKeyValuePair.getFirst(), timeStamp, emptyKeyValuePair.getSecond());
                         tableMetadata.add(p);
                     }
                     boolean setRowKeyOrderOptimizableCell = newState == PIndexState.BUILDING && !rowKeyOrderOptimizable;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
index 4bc1ef5..d5fd96c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java
@@ -80,6 +80,7 @@ public class SequenceRegionObserver extends BaseRegionObserver {
     public static final String NUM_TO_ALLOCATE = "NUM_TO_ALLOCATE";
     private static final byte[] SUCCESS_VALUE = PInteger.INSTANCE.toBytes(Integer.valueOf(Sequence.SUCCESS));
     
+    //TODO: samarth verify that it is ok to send non-encoded empty column here. Probably is.
     private static Result getErrorResult(byte[] row, long timestamp, int errorCode) {
         byte[] errorCodeBuf = new byte[PInteger.INSTANCE.getByteSize()];
         PInteger.INSTANCE.getCodec().encodeInt(errorCode, errorCodeBuf, 0);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 5481910..3bdcee3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -231,6 +231,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         byte[] deleteCQ = null;
         byte[] deleteCF = null;
         byte[] emptyCF = null;
+        byte[] emptyKVQualifier = null;
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         if (upsertSelectTable != null) {
             isUpsert = true;
@@ -245,6 +246,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                 deleteCQ = scan.getAttribute(BaseScannerRegionObserver.DELETE_CQ);
             }
             emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_CF);
+            emptyKVQualifier = scan.getAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER);
         }
         TupleProjector tupleProjector = null;
         HRegion dataRegion = null;
@@ -495,8 +497,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                                     if (!timeStamps.contains(kvts)) {
                                         Put put = new Put(kv.getRowArray(), kv.getRowOffset(),
                                             kv.getRowLength());
-                                        put.add(emptyCF, QueryConstants.EMPTY_COLUMN_BYTES, kvts,
-                                            ByteUtil.EMPTY_BYTE_ARRAY);
+                                        // FIXME: Use the right byte array value. Transactional tables can't
+                                        // have empty byte arrays since Tephra seems them as delete markers.
+                                        put.add(emptyCF, emptyKVQualifier != null ? emptyKVQualifier
+                                                : QueryConstants.EMPTY_COLUMN_BYTES, kvts, ByteUtil.EMPTY_BYTE_ARRAY);
                                         mutations.add(put);
                                     }
                                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
index 9fdfe51..83a7db9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
@@ -269,6 +269,16 @@ public final class PTableProtos {
      * <code>optional bool isDynamic = 14;</code>
      */
     boolean getIsDynamic();
+
+    // optional int32 columnQualifier = 15;
+    /**
+     * <code>optional int32 columnQualifier = 15;</code>
+     */
+    boolean hasColumnQualifier();
+    /**
+     * <code>optional int32 columnQualifier = 15;</code>
+     */
+    int getColumnQualifier();
   }
   /**
    * Protobuf type {@code PColumn}
@@ -391,6 +401,11 @@ public final class PTableProtos {
               isDynamic_ = input.readBool();
               break;
             }
+            case 120: {
+              bitField0_ |= 0x00004000;
+              columnQualifier_ = input.readInt32();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -709,6 +724,22 @@ public final class PTableProtos {
       return isDynamic_;
     }
 
+    // optional int32 columnQualifier = 15;
+    public static final int COLUMNQUALIFIER_FIELD_NUMBER = 15;
+    private int columnQualifier_;
+    /**
+     * <code>optional int32 columnQualifier = 15;</code>
+     */
+    public boolean hasColumnQualifier() {
+      return ((bitField0_ & 0x00004000) == 0x00004000);
+    }
+    /**
+     * <code>optional int32 columnQualifier = 15;</code>
+     */
+    public int getColumnQualifier() {
+      return columnQualifier_;
+    }
+
     private void initFields() {
       columnNameBytes_ = com.google.protobuf.ByteString.EMPTY;
       familyNameBytes_ = com.google.protobuf.ByteString.EMPTY;
@@ -724,6 +755,7 @@ public final class PTableProtos {
       expression_ = "";
       isRowTimestamp_ = false;
       isDynamic_ = false;
+      columnQualifier_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -799,6 +831,9 @@ public final class PTableProtos {
       if (((bitField0_ & 0x00002000) == 0x00002000)) {
         output.writeBool(14, isDynamic_);
       }
+      if (((bitField0_ & 0x00004000) == 0x00004000)) {
+        output.writeInt32(15, columnQualifier_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -864,6 +899,10 @@ public final class PTableProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(14, isDynamic_);
       }
+      if (((bitField0_ & 0x00004000) == 0x00004000)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(15, columnQualifier_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -957,6 +996,11 @@ public final class PTableProtos {
         result = result && (getIsDynamic()
             == other.getIsDynamic());
       }
+      result = result && (hasColumnQualifier() == other.hasColumnQualifier());
+      if (hasColumnQualifier()) {
+        result = result && (getColumnQualifier()
+            == other.getColumnQualifier());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -1026,6 +1070,10 @@ public final class PTableProtos {
         hash = (37 * hash) + ISDYNAMIC_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getIsDynamic());
       }
+      if (hasColumnQualifier()) {
+        hash = (37 * hash) + COLUMNQUALIFIER_FIELD_NUMBER;
+        hash = (53 * hash) + getColumnQualifier();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -1163,6 +1211,8 @@ public final class PTableProtos {
         bitField0_ = (bitField0_ & ~0x00001000);
         isDynamic_ = false;
         bitField0_ = (bitField0_ & ~0x00002000);
+        columnQualifier_ = 0;
+        bitField0_ = (bitField0_ & ~0x00004000);
         return this;
       }
 
@@ -1247,6 +1297,10 @@ public final class PTableProtos {
           to_bitField0_ |= 0x00002000;
         }
         result.isDynamic_ = isDynamic_;
+        if (((from_bitField0_ & 0x00004000) == 0x00004000)) {
+          to_bitField0_ |= 0x00004000;
+        }
+        result.columnQualifier_ = columnQualifier_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1309,6 +1363,9 @@ public final class PTableProtos {
         if (other.hasIsDynamic()) {
           setIsDynamic(other.getIsDynamic());
         }
+        if (other.hasColumnQualifier()) {
+          setColumnQualifier(other.getColumnQualifier());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1909,6 +1966,39 @@ public final class PTableProtos {
         return this;
       }
 
+      // optional int32 columnQualifier = 15;
+      private int columnQualifier_ ;
+      /**
+       * <code>optional int32 columnQualifier = 15;</code>
+       */
+      public boolean hasColumnQualifier() {
+        return ((bitField0_ & 0x00004000) == 0x00004000);
+      }
+      /**
+       * <code>optional int32 columnQualifier = 15;</code>
+       */
+      public int getColumnQualifier() {
+        return columnQualifier_;
+      }
+      /**
+       * <code>optional int32 columnQualifier = 15;</code>
+       */
+      public Builder setColumnQualifier(int value) {
+        bitField0_ |= 0x00004000;
+        columnQualifier_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 columnQualifier = 15;</code>
+       */
+      public Builder clearColumnQualifier() {
+        bitField0_ = (bitField0_ & ~0x00004000);
+        columnQualifier_ = 0;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:PColumn)
     }
 
@@ -3338,6 +3428,16 @@ public final class PTableProtos {
      * <code>optional int64 indexDisableTimestamp = 29;</code>
      */
     long getIndexDisableTimestamp();
+
+    // optional bytes storageScheme = 30;
+    /**
+     * <code>optional bytes storageScheme = 30;</code>
+     */
+    boolean hasStorageScheme();
+    /**
+     * <code>optional bytes storageScheme = 30;</code>
+     */
+    com.google.protobuf.ByteString getStorageScheme();
   }
   /**
    * Protobuf type {@code PTable}
@@ -3553,6 +3653,11 @@ public final class PTableProtos {
               indexDisableTimestamp_ = input.readInt64();
               break;
             }
+            case 242: {
+              bitField0_ |= 0x02000000;
+              storageScheme_ = input.readBytes();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -4163,6 +4268,22 @@ public final class PTableProtos {
       return indexDisableTimestamp_;
     }
 
+    // optional bytes storageScheme = 30;
+    public static final int STORAGESCHEME_FIELD_NUMBER = 30;
+    private com.google.protobuf.ByteString storageScheme_;
+    /**
+     * <code>optional bytes storageScheme = 30;</code>
+     */
+    public boolean hasStorageScheme() {
+      return ((bitField0_ & 0x02000000) == 0x02000000);
+    }
+    /**
+     * <code>optional bytes storageScheme = 30;</code>
+     */
+    public com.google.protobuf.ByteString getStorageScheme() {
+      return storageScheme_;
+    }
+
     private void initFields() {
       schemaNameBytes_ = com.google.protobuf.ByteString.EMPTY;
       tableNameBytes_ = com.google.protobuf.ByteString.EMPTY;
@@ -4193,6 +4314,7 @@ public final class PTableProtos {
       transactional_ = false;
       updateCacheFrequency_ = 0L;
       indexDisableTimestamp_ = 0L;
+      storageScheme_ = com.google.protobuf.ByteString.EMPTY;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -4347,6 +4469,9 @@ public final class PTableProtos {
       if (((bitField0_ & 0x01000000) == 0x01000000)) {
         output.writeInt64(29, indexDisableTimestamp_);
       }
+      if (((bitField0_ & 0x02000000) == 0x02000000)) {
+        output.writeBytes(30, storageScheme_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -4477,6 +4602,10 @@ public final class PTableProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeInt64Size(29, indexDisableTimestamp_);
       }
+      if (((bitField0_ & 0x02000000) == 0x02000000)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(30, storageScheme_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -4633,6 +4762,11 @@ public final class PTableProtos {
         result = result && (getIndexDisableTimestamp()
             == other.getIndexDisableTimestamp());
       }
+      result = result && (hasStorageScheme() == other.hasStorageScheme());
+      if (hasStorageScheme()) {
+        result = result && getStorageScheme()
+            .equals(other.getStorageScheme());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -4762,6 +4896,10 @@ public final class PTableProtos {
         hash = (37 * hash) + INDEXDISABLETIMESTAMP_FIELD_NUMBER;
         hash = (53 * hash) + hashLong(getIndexDisableTimestamp());
       }
+      if (hasStorageScheme()) {
+        hash = (37 * hash) + STORAGESCHEME_FIELD_NUMBER;
+        hash = (53 * hash) + getStorageScheme().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -4944,6 +5082,8 @@ public final class PTableProtos {
         bitField0_ = (bitField0_ & ~0x08000000);
         indexDisableTimestamp_ = 0L;
         bitField0_ = (bitField0_ & ~0x10000000);
+        storageScheme_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x20000000);
         return this;
       }
 
@@ -5104,6 +5244,10 @@ public final class PTableProtos {
           to_bitField0_ |= 0x01000000;
         }
         result.indexDisableTimestamp_ = indexDisableTimestamp_;
+        if (((from_bitField0_ & 0x20000000) == 0x20000000)) {
+          to_bitField0_ |= 0x02000000;
+        }
+        result.storageScheme_ = storageScheme_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -5285,6 +5429,9 @@ public final class PTableProtos {
         if (other.hasIndexDisableTimestamp()) {
           setIndexDisableTimestamp(other.getIndexDisableTimestamp());
         }
+        if (other.hasStorageScheme()) {
+          setStorageScheme(other.getStorageScheme());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -7054,6 +7201,42 @@ public final class PTableProtos {
         return this;
       }
 
+      // optional bytes storageScheme = 30;
+      private com.google.protobuf.ByteString storageScheme_ = com.google.protobuf.ByteString.EMPTY;
+      /**
+       * <code>optional bytes storageScheme = 30;</code>
+       */
+      public boolean hasStorageScheme() {
+        return ((bitField0_ & 0x20000000) == 0x20000000);
+      }
+      /**
+       * <code>optional bytes storageScheme = 30;</code>
+       */
+      public com.google.protobuf.ByteString getStorageScheme() {
+        return storageScheme_;
+      }
+      /**
+       * <code>optional bytes storageScheme = 30;</code>
+       */
+      public Builder setStorageScheme(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x20000000;
+        storageScheme_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bytes storageScheme = 30;</code>
+       */
+      public Builder clearStorageScheme() {
+        bitField0_ = (bitField0_ & ~0x20000000);
+        storageScheme_ = getDefaultInstance().getStorageScheme();
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:PTable)
     }
 
@@ -7089,7 +7272,7 @@ public final class PTableProtos {
       descriptor;
   static {
     java.lang.String[] descriptorData = {
-      "\n\014PTable.proto\032\021PGuidePosts.proto\"\246\002\n\007PC" +
+      "\n\014PTable.proto\032\021PGuidePosts.proto\"\277\002\n\007PC" +
       "olumn\022\027\n\017columnNameBytes\030\001 \002(\014\022\027\n\017family" +
       "NameBytes\030\002 \001(\014\022\020\n\010dataType\030\003 \002(\t\022\021\n\tmax" +
       "Length\030\004 \001(\005\022\r\n\005scale\030\005 \001(\005\022\020\n\010nullable\030" +
@@ -7097,32 +7280,33 @@ public final class PTableProtos {
       "(\005\022\021\n\tarraySize\030\t \001(\005\022\024\n\014viewConstant\030\n " +
       "\001(\014\022\026\n\016viewReferenced\030\013 \001(\010\022\022\n\nexpressio" +
       "n\030\014 \001(\t\022\026\n\016isRowTimestamp\030\r \001(\010\022\021\n\tisDyn" +
-      "amic\030\016 \001(\010\"\232\001\n\013PTableStats\022\013\n\003key\030\001 \002(\014\022" +
-      "\016\n\006values\030\002 \003(\014\022\033\n\023guidePostsByteCount\030\003",
-      " \001(\003\022\025\n\rkeyBytesCount\030\004 \001(\003\022\027\n\017guidePost" +
-      "sCount\030\005 \001(\005\022!\n\013pGuidePosts\030\006 \001(\0132\014.PGui" +
-      "dePosts\"\303\005\n\006PTable\022\027\n\017schemaNameBytes\030\001 " +
-      "\002(\014\022\026\n\016tableNameBytes\030\002 \002(\014\022\036\n\ttableType" +
-      "\030\003 \002(\0162\013.PTableType\022\022\n\nindexState\030\004 \001(\t\022" +
-      "\026\n\016sequenceNumber\030\005 \002(\003\022\021\n\ttimeStamp\030\006 \002" +
-      "(\003\022\023\n\013pkNameBytes\030\007 \001(\014\022\021\n\tbucketNum\030\010 \002" +
-      "(\005\022\031\n\007columns\030\t \003(\0132\010.PColumn\022\030\n\007indexes" +
-      "\030\n \003(\0132\007.PTable\022\027\n\017isImmutableRows\030\013 \002(\010" +
-      "\022 \n\nguidePosts\030\014 \003(\0132\014.PTableStats\022\032\n\022da",
-      "taTableNameBytes\030\r \001(\014\022\031\n\021defaultFamilyN" +
-      "ame\030\016 \001(\014\022\022\n\ndisableWAL\030\017 \002(\010\022\023\n\013multiTe" +
-      "nant\030\020 \002(\010\022\020\n\010viewType\030\021 \001(\014\022\025\n\rviewStat" +
-      "ement\030\022 \001(\014\022\025\n\rphysicalNames\030\023 \003(\014\022\020\n\010te" +
-      "nantId\030\024 \001(\014\022\023\n\013viewIndexId\030\025 \001(\005\022\021\n\tind" +
-      "exType\030\026 \001(\014\022\026\n\016statsTimeStamp\030\027 \001(\003\022\022\n\n" +
-      "storeNulls\030\030 \001(\010\022\027\n\017baseColumnCount\030\031 \001(" +
-      "\005\022\036\n\026rowKeyOrderOptimizable\030\032 \001(\010\022\025\n\rtra" +
-      "nsactional\030\033 \001(\010\022\034\n\024updateCacheFrequency" +
-      "\030\034 \001(\003\022\035\n\025indexDisableTimestamp\030\035 \001(\003*A\n",
-      "\nPTableType\022\n\n\006SYSTEM\020\000\022\010\n\004USER\020\001\022\010\n\004VIE" +
-      "W\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(org.apache.p" +
-      "hoenix.coprocessor.generatedB\014PTableProt" +
-      "osH\001\210\001\001\240\001\001"
+      "amic\030\016 \001(\010\022\027\n\017columnQualifier\030\017 \001(\005\"\232\001\n\013" +
+      "PTableStats\022\013\n\003key\030\001 \002(\014\022\016\n\006values\030\002 \003(\014",
+      "\022\033\n\023guidePostsByteCount\030\003 \001(\003\022\025\n\rkeyByte" +
+      "sCount\030\004 \001(\003\022\027\n\017guidePostsCount\030\005 \001(\005\022!\n" +
+      "\013pGuidePosts\030\006 \001(\0132\014.PGuidePosts\"\332\005\n\006PTa" +
+      "ble\022\027\n\017schemaNameBytes\030\001 \002(\014\022\026\n\016tableNam" +
+      "eBytes\030\002 \002(\014\022\036\n\ttableType\030\003 \002(\0162\013.PTable" +
+      "Type\022\022\n\nindexState\030\004 \001(\t\022\026\n\016sequenceNumb" +
+      "er\030\005 \002(\003\022\021\n\ttimeStamp\030\006 \002(\003\022\023\n\013pkNameByt" +
+      "es\030\007 \001(\014\022\021\n\tbucketNum\030\010 \002(\005\022\031\n\007columns\030\t" +
+      " \003(\0132\010.PColumn\022\030\n\007indexes\030\n \003(\0132\007.PTable" +
+      "\022\027\n\017isImmutableRows\030\013 \002(\010\022 \n\nguidePosts\030",
+      "\014 \003(\0132\014.PTableStats\022\032\n\022dataTableNameByte" +
+      "s\030\r \001(\014\022\031\n\021defaultFamilyName\030\016 \001(\014\022\022\n\ndi" +
+      "sableWAL\030\017 \002(\010\022\023\n\013multiTenant\030\020 \002(\010\022\020\n\010v" +
+      "iewType\030\021 \001(\014\022\025\n\rviewStatement\030\022 \001(\014\022\025\n\r" +
+      "physicalNames\030\023 \003(\014\022\020\n\010tenantId\030\024 \001(\014\022\023\n" +
+      "\013viewIndexId\030\025 \001(\005\022\021\n\tindexType\030\026 \001(\014\022\026\n" +
+      "\016statsTimeStamp\030\027 \001(\003\022\022\n\nstoreNulls\030\030 \001(" +
+      "\010\022\027\n\017baseColumnCount\030\031 \001(\005\022\036\n\026rowKeyOrde" +
+      "rOptimizable\030\032 \001(\010\022\025\n\rtransactional\030\033 \001(" +
+      "\010\022\034\n\024updateCacheFrequency\030\034 \001(\003\022\035\n\025index",
+      "DisableTimestamp\030\035 \001(\003\022\025\n\rstorageScheme\030" +
+      "\036 \001(\014*A\n\nPTableType\022\n\n\006SYSTEM\020\000\022\010\n\004USER\020" +
+      "\001\022\010\n\004VIEW\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(org." +
+      "apache.phoenix.coprocessor.generatedB\014PT" +
+      "ableProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -7134,7 +7318,7 @@ public final class PTableProtos {
           internal_static_PColumn_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_PColumn_descriptor,
-              new java.lang.String[] { "ColumnNameBytes", "FamilyNameBytes", "DataType", "MaxLength", "Scale", "Nullable", "Position", "SortOrder", "ArraySize", "ViewConstant", "ViewReferenced", "Expression", "IsRowTimestamp", "IsDynamic", });
+              new java.lang.String[] { "ColumnNameBytes", "FamilyNameBytes", "DataType", "MaxLength", "Scale", "Nullable", "Position", "SortOrder", "ArraySize", "ViewConstant", "ViewReferenced", "Expression", "IsRowTimestamp", "IsDynamic", "ColumnQualifier", });
           internal_static_PTableStats_descriptor =
             getDescriptor().getMessageTypes().get(1);
           internal_static_PTableStats_fieldAccessorTable = new
@@ -7146,7 +7330,7 @@ public final class PTableProtos {
           internal_static_PTable_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_PTable_descriptor,
-              new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", "TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", "BucketNum", "Columns", "Indexes", "IsImmutableRows", "GuidePosts", "DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", "StoreNulls", "BaseColumnCount", "RowKeyOrderOptimizable", "Transactional", "UpdateCacheFrequency", "IndexDisableTimestamp", });
+              new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", "TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", "BucketNum", "Columns", "Indexes", "IsImmutableRows", "GuidePosts", "DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", "StoreNulls", "BaseColumnCount", "RowKeyOrderOptimizable", "Transactional", "UpdateCacheFrequency", "IndexDisableTimestamp", "StorageScheme", });
           return null;
         }
       };

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index cc8f7a5..71af992 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -767,7 +767,7 @@ public class MutationState implements SQLCloseable {
                 }
                 for (PColumn column : columns) {
                     if (column != null) {
-                        resolvedTable.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString());
+                        resolvedTable.getColumnFamily(column.getFamilyName().getString()).getPColumnForColumnName(column.getName().getString());
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java
index 4b5fdbb..163cc72 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java
@@ -46,8 +46,8 @@ public class KeyValueColumnExpression extends ColumnExpression {
     public KeyValueColumnExpression() {
     }
 
-    public KeyValueColumnExpression(PColumn column) {
-        this(column, null);
+    public KeyValueColumnExpression(PColumn column, boolean encodedColumnName) {
+        this(column, null, encodedColumnName);
     }
 
     public KeyValueColumnExpression(PDatum column, byte[] cf, byte[] cq) {
@@ -56,18 +56,19 @@ public class KeyValueColumnExpression extends ColumnExpression {
         this.cq = cq;
     }
 
-    public KeyValueColumnExpression(PColumn column, String displayName) {
+    public KeyValueColumnExpression(PColumn column, String displayName, boolean encodedColumnName) {
         super(column);
         this.cf = column.getFamilyName().getBytes();
-        this.cq = column.getName().getBytes();
+        this.cq = SchemaUtil.getColumnQualifier(column, encodedColumnName);
         this.displayName = displayName;
     }
 
     public byte[] getColumnFamily() {
         return cf;
     }
-
-    public byte[] getColumnName() {
+    
+    //TODO: samarth look for the callers of this.
+    public byte[] getColumnQualifier() {
         return cq;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
index 3a38dee..2744f35 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
@@ -154,6 +154,7 @@ public class ProjectedColumnExpression extends ColumnExpression {
         return Determinism.PER_INVOCATION;
     }
 
+    @Override
     public ProjectedColumnExpression clone() {
         return new ProjectedColumnExpression(this.column, this.columns, this.position, this.displayName);
     }