You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2016/03/21 21:53:44 UTC

[2/3] phoenix git commit: Replace empty key value with unit tests passing

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
index b8b0350..4767090 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.util.SchemaUtil;
 
 /**
  * When selecting specific columns in a SELECT query, this filter passes only selected columns
@@ -53,6 +53,8 @@ public class ColumnProjectionFilter extends FilterBase implements Writable {
     private byte[] emptyCFName;
     private Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> columnsTracker;
     private Set<byte[]> conditionOnlyCfs;
+    private boolean usesEncodedColumnNames;
+    private byte[] emptyKVQualifier;
 
     public ColumnProjectionFilter() {
 
@@ -60,10 +62,12 @@ public class ColumnProjectionFilter extends FilterBase implements Writable {
 
     public ColumnProjectionFilter(byte[] emptyCFName,
             Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> columnsTracker,
-            Set<byte[]> conditionOnlyCfs) {
+            Set<byte[]> conditionOnlyCfs, boolean usesEncodedColumnNames) {
         this.emptyCFName = emptyCFName;
         this.columnsTracker = columnsTracker;
         this.conditionOnlyCfs = conditionOnlyCfs;
+        this.usesEncodedColumnNames = usesEncodedColumnNames;
+        this.emptyKVQualifier = SchemaUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst();
     }
 
     @Override
@@ -87,6 +91,9 @@ public class ColumnProjectionFilter extends FilterBase implements Writable {
             familyMapSize--;
         }
         int conditionOnlyCfsSize = WritableUtils.readVInt(input);
+        usesEncodedColumnNames = conditionOnlyCfsSize > 0;
+        emptyKVQualifier = SchemaUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst();
+        conditionOnlyCfsSize = Math.abs(conditionOnlyCfsSize) - 1; // restore to the actual value.
         this.conditionOnlyCfs = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
         while (conditionOnlyCfsSize > 0) {
             this.conditionOnlyCfs.add(WritableUtils.readCompressedByteArray(input));
@@ -110,12 +117,13 @@ public class ColumnProjectionFilter extends FilterBase implements Writable {
                 }
             }
         }
-        // Write conditionOnlyCfs
-        WritableUtils.writeVInt(output, this.conditionOnlyCfs.size());
+        // Encode usesEncodedColumnNames in conditionOnlyCfs size.
+        WritableUtils.writeVInt(output, (this.conditionOnlyCfs.size() + 1) * (usesEncodedColumnNames ? 1 : -1));
         for (byte[] f : this.conditionOnlyCfs) {
             WritableUtils.writeCompressedByteArray(output, f);
         }
-    }
+    
+}
 
     @Override
     public byte[] toByteArray() throws IOException {
@@ -153,9 +161,9 @@ public class ColumnProjectionFilter extends FilterBase implements Writable {
         // make sure we're not holding to any of the byte[]'s
         ptr.set(HConstants.EMPTY_BYTE_ARRAY);
         if (kvs.isEmpty()) {
-            kvs.add(new KeyValue(firstKV.getRowArray(), firstKV.getRowOffset(),firstKV.getRowLength(), this.emptyCFName,
-                    0, this.emptyCFName.length, QueryConstants.EMPTY_COLUMN_BYTES, 0,
-                    QueryConstants.EMPTY_COLUMN_BYTES.length, HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0));
+            kvs.add(new KeyValue(firstKV.getRowArray(), firstKV.getRowOffset(), firstKV.getRowLength(),
+                    this.emptyCFName, 0, this.emptyCFName.length, emptyKVQualifier, 0,
+                    emptyKVQualifier.length, HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
index 569faa5..8d03797 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java
@@ -184,7 +184,7 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil
         ExpressionVisitor<Void> visitor = new StatelessTraverseAllExpressionVisitor<Void>() {
             @Override
             public Void visit(KeyValueColumnExpression expression) {
-                inputTuple.addColumn(expression.getColumnFamily(), expression.getColumnName());
+                inputTuple.addColumn(expression.getColumnFamily(), expression.getColumnQualifier());
                 return null;
             }
         };

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
index eaf8d35..07f7072 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java
@@ -58,7 +58,7 @@ public abstract class SingleKeyValueComparisonFilter extends BooleanExpressionFi
             @Override
             public Void visit(KeyValueColumnExpression expression) {
                 cf = expression.getColumnFamily();
-                cq = expression.getColumnName();
+                cq = expression.getColumnQualifier();
                 return null;
             }
         };

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 13ad7e5..7e6d0f6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -299,6 +299,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
     private int maxTrailingNulls;
     private ColumnReference dataEmptyKeyValueRef;
     private boolean rowKeyOrderOptimizable;
+    private boolean usesEncodedColumnNames;
+    private ImmutableBytesPtr emptyKeyValueQualifierPtr;
     
     private IndexMaintainer(RowKeySchema dataRowKeySchema, boolean isDataTableSalted) {
         this.dataRowKeySchema = dataRowKeySchema;
@@ -312,14 +314,19 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         this.isMultiTenant = dataTable.isMultiTenant();
         this.viewIndexId = index.getViewIndexId() == null ? null : MetaDataUtil.getViewIndexIdDataType().toBytes(index.getViewIndexId());
         this.isLocalIndex = index.getIndexType() == IndexType.LOCAL;
-
+        /* 
+         * There is nothing to prevent new indexes on existing tables to have encoded column names.
+         * Except, due to backward compatibility reasons, we aren't able to change IndexMaintainer and the state
+         * that is serialized in it. Because of this we are forced to have the indexes inherit the
+         * storage scheme of the parent data tables. 
+         */
+        this.usesEncodedColumnNames = SchemaUtil.usesEncodedColumnNames(index);
         byte[] indexTableName = index.getPhysicalName().getBytes();
         // Use this for the nDataSaltBuckets as we need this for local indexes
         // TODO: persist nDataSaltBuckets separately, but maintain b/w compat.
         Integer nIndexSaltBuckets = isLocalIndex ? dataTable.getBucketNum() : index.getBucketNum();
         boolean indexWALDisabled = index.isWALDisabled();
         int indexPosOffset = (index.getBucketNum() == null ? 0 : 1) + (this.isMultiTenant ? 1 : 0) + (this.viewIndexId == null ? 0 : 1);
-//        int indexPosOffset = !isLocalIndex && nIndexSaltBuckets > 0 ? 1 : 0;
         int nIndexColumns = index.getColumns().size() - indexPosOffset;
         int nIndexPKColumns = index.getPKColumns().size() - indexPosOffset;
         // number of expressions that are indexed that are not present in the row key of the data table
@@ -330,7 +337,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             String dataFamilyName = IndexUtil.getDataColumnFamilyName(indexColumnName);
             String dataColumnName = IndexUtil.getDataColumnName(indexColumnName);
             try {
-                PColumn dataColumn = dataFamilyName.equals("") ? dataTable.getColumn(dataColumnName) : dataTable.getColumnFamily(dataFamilyName).getColumn(dataColumnName);
+                PColumn dataColumn = dataFamilyName.equals("") ? dataTable.getPColumnForColumnName(dataColumnName) : dataTable.getColumnFamily(dataFamilyName).getPColumnForColumnName(dataColumnName);
                 if (SchemaUtil.isPKColumn(dataColumn)) 
                     continue;
             } catch (ColumnNotFoundException e) {
@@ -404,7 +411,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                 throw new RuntimeException(e); // Impossible
             }
             if ( expressionIndexCompiler.getColumnRef()!=null ) {
-            	// get the column of the data table that corresponds to this index column
+            	// get the column of the data column that corresponds to this index column
 	            PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString());
 	            boolean isPKColumn = SchemaUtil.isPKColumn(column);
 	            if (isPKColumn) {
@@ -429,7 +436,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             PColumnFamily family = index.getColumnFamilies().get(i);
             for (PColumn indexColumn : family.getColumns()) {
                 PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString());
-                this.coveredColumns.add(new ColumnReference(column.getFamilyName().getBytes(), column.getName().getBytes()));
+                byte[] cq = SchemaUtil.getColumnQualifier(column, index);
+                this.coveredColumns.add(new ColumnReference(column.getFamilyName().getBytes(), cq));
             }
         }
         this.estimatedIndexRowKeyBytes = estimateIndexRowKeyByteSize(indexColByteSize);
@@ -749,7 +757,6 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             // same for all rows in this index)
             if (!viewConstantColumnBitSet.get(i)) {
                 int pos = rowKeyMetaData.getIndexPkPosition(i-dataPosOffset);
-                Field dataField = dataRowKeySchema.getField(i);
                 indexFields[pos] = 
                         dataRowKeySchema.getField(i);
             } 
@@ -843,9 +850,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             put = new Put(indexRowKey);
             // add the keyvalue for the empty row
             put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey),
-                this.getEmptyKeyValueFamily(), QueryConstants.EMPTY_COLUMN_BYTES_PTR, ts,
+                this.getEmptyKeyValueFamily(), emptyKeyValueQualifierPtr, ts,
                 // set the value to the empty column name
-                QueryConstants.EMPTY_COLUMN_BYTES_PTR));
+                emptyKeyValueQualifierPtr));
             put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
         }
         int i = 0;
@@ -1058,6 +1065,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         
         if (isNewClient) {
             int numIndexedExpressions = WritableUtils.readVInt(input);
+            usesEncodedColumnNames = numIndexedExpressions > 0;
+            numIndexedExpressions = Math.abs(numIndexedExpressions) - 1;
             indexedExpressions = Lists.newArrayListWithExpectedSize(numIndexedExpressions);        
             for (int i = 0; i < numIndexedExpressions; i++) {
             	Expression expression = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
@@ -1147,7 +1156,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         WritableUtils.writeVInt(output,-emptyKeyValueCFPtr.getLength());
         output.write(emptyKeyValueCFPtr.get(),emptyKeyValueCFPtr.getOffset(), emptyKeyValueCFPtr.getLength());
         
-        WritableUtils.writeVInt(output, indexedExpressions.size());
+        // Hack to encode usesEncodedColumnNames in indexedExpressions size.
+        int indexedExpressionsSize = (indexedExpressions.size() + 1) * (usesEncodedColumnNames ? 1 : -1);
+        WritableUtils.writeVInt(output, indexedExpressionsSize);
         for (Expression expression : indexedExpressions) {
         	WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal());
         	expression.write(output);
@@ -1204,10 +1215,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
      * Init calculated state reading/creating
      */
     private void initCachedState() {
-        dataEmptyKeyValueRef =
-                new ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(),
-                        QueryConstants.EMPTY_COLUMN_BYTES);
-
+        dataEmptyKeyValueRef = new ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(), SchemaUtil
+                .getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst());
+        emptyKeyValueQualifierPtr = new ImmutableBytesPtr(SchemaUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst());
         indexQualifiers = Lists.newArrayListWithExpectedSize(this.coveredColumns.size());
         for (ColumnReference ref : coveredColumns) {
             indexQualifiers.add(new ImmutableBytesPtr(IndexUtil.getIndexColumnName(
@@ -1221,7 +1231,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         	KeyValueExpressionVisitor visitor = new KeyValueExpressionVisitor() {
                 @Override
                 public Void visit(KeyValueColumnExpression expression) {
-                	if (indexedColumns.add(new ColumnReference(expression.getColumnFamily(), expression.getColumnName()))) {
+                	if (indexedColumns.add(new ColumnReference(expression.getColumnFamily(), expression.getColumnQualifier()))) {
                 		indexedColumnTypes.add(expression.getDataType());
                 	}
                     return null;
@@ -1486,4 +1496,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             return udfParseNodes;
         }
     }
+    
+    public byte[] getEmptyKeyValueQualifier() {
+        return emptyKeyValueQualifierPtr.copyBytes();
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 32dc632..6ee981a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -221,10 +221,15 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
                 Scan scan = new Scan();
                 // Project all mutable columns
                 for (ColumnReference ref : mutableColumns) {
+                    //TODO: samarth confirm this is ok
                     scan.addColumn(ref.getFamily(), ref.getQualifier());
                 }
+                // Indexes inherit the storage scheme of the data table which means all the indexes have the same
+                // storage scheme and empty key value qualifier.
+                byte[] emptyKeyValueQualifier = indexMaintainers.get(0).getEmptyKeyValueQualifier();
+                
                 // Project empty key value column
-                scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES);
+                scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), emptyKeyValueQualifier);
                 ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1);
                 scanRanges.initializeScan(scan);
                 TableName tableName = env.getRegion().getRegionInfo().getTable();
@@ -262,7 +267,8 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
             Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException {
         if (scanner != null) {
             Result result;
-            ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES);
+            ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0)
+                    .getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier());
             // Process existing data table rows by removing the old index row and adding the new index row
             while ((result = scanner.next()) != null) {
                 Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow()));
@@ -290,7 +296,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
             // to generate point delete markers for all index rows that were added. We don't have Tephra
             // manage index rows in change sets because we don't want to be hit with the additional
             // memory hit and do not need to do conflict detection on index rows.
-            ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES);
+            ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier());
             while ((result = scanner.next()) != null) {
                 Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow()));
                 // Sort by timestamp, type, cf, cq so we can process in time batches from oldest to newest

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index c36a72b..eabaaa3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -204,7 +204,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                             // Project empty key value unless the column family containing it has
                             // been projected in its entirety.
                             if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) {
-                                scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES);
+                                scan.addColumn(ecf, SchemaUtil.getEmptyKeyValueInfo(table).getFirst());
                             }
                         }
                     }
@@ -297,6 +297,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                         if (whereCol.getSecond() == null) {
                             scan.addFamily(family);                            
                         } else {
+                            //TODO: samarth confirm this
                             scan.addColumn(family, whereCol.getSecond());
                         }
                     }
@@ -321,7 +322,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
             // the ExplicitColumnTracker not to be used, though.
             if (!statement.isAggregate() && filteredColumnNotInProjection) {
                 ScanUtil.andFilterAtEnd(scan, new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table),
-                        columnsTracker, conditionOnlyCfs));
+                        columnsTracker, conditionOnlyCfs, SchemaUtil.usesEncodedColumnNames(table)));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index f875a77..c11b472 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -202,6 +202,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
     public static final byte[] BASE_COLUMN_COUNT_BYTES = Bytes.toBytes(BASE_COLUMN_COUNT);
     public static final String IS_ROW_TIMESTAMP = "IS_ROW_TIMESTAMP";
     public static final byte[] IS_ROW_TIMESTAMP_BYTES = Bytes.toBytes(IS_ROW_TIMESTAMP);
+    public static final String COLUMN_QUALIFIER = "COLUMN_QUALIFIER";
+    public static final byte[] COLUMN_QUALIFIER_BYTES = Bytes.toBytes(COLUMN_QUALIFIER);
 
     public static final String TABLE_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY;
     public static final byte[] TABLE_FAMILY_BYTES = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
@@ -290,6 +292,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
     public static final byte[] UPDATE_CACHE_FREQUENCY_BYTES = Bytes.toBytes(UPDATE_CACHE_FREQUENCY);
 
     public static final String ASYNC_CREATED_DATE = "ASYNC_CREATED_DATE";
+    public static final String STORAGE_SCHEME = "STORAGE_SCHEME";
+    public static final byte[] STORAGE_SCHEME_BYTES = Bytes.toBytes(STORAGE_SCHEME);
 
     private final PhoenixConnection connection;
     private final ResultSet emptyResultSet;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index ff73530..404fb06 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -279,7 +279,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
         final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded);
         final String logicalNamesAsJson = TargetTableRefFunctions.LOGICAL_NAMES_TO_JSON.apply(tablesToBeLoaded);
 
-        job.getConfiguration().set(FormatToBytesWritableMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson);
+        job.getConfiguration().set(FormatToBytesWritableMapper.PHYSICAL_TABLE_NAMES_CONFKEY,tableNamesAsJson);
         job.getConfiguration().set(FormatToBytesWritableMapper.LOGICAL_NAMES_CONFKEY,logicalNamesAsJson);
 
         // give subclasses their hook

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
index 2c9b6d9..b975a03 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.TreeMap;
+
 import javax.annotation.Nullable;
 
 import org.apache.hadoop.conf.Configuration;
@@ -44,12 +45,12 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
 import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
-import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.UpsertExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,7 +87,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
     public static final String IGNORE_INVALID_ROW_CONFKEY = "phoenix.mapreduce.import.ignoreinvalidrow";
 
     /** Configuration key for the table names */
-    public static final String TABLE_NAMES_CONFKEY = "phoenix.mapreduce.import.tablenames";
+    public static final String PHYSICAL_TABLE_NAMES_CONFKEY = "phoenix.mapreduce.import.tablenames";
 
     /** Configuration key for the table logical names */
     public static final String LOGICAL_NAMES_CONFKEY = "phoenix.mapreduce.import.logicalnames";
@@ -101,8 +102,9 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
     protected PhoenixConnection conn;
     protected UpsertExecutor<RECORD, ?> upsertExecutor;
     protected ImportPreUpsertKeyValueProcessor preUpdateProcessor;
-    protected List<String> tableNames;
-    protected List<String> logicalNames;
+    //TODO: merge physicalTableNames and logicalNames into some associative data structure since they are 1:1.
+    protected List<String> physicalTableNames;
+    protected List<Pair<String, byte[]>> logicalTables; // List of pairs of a table's logical name and the empty key value's qualifier used for it.
     protected MapperUpsertListener<RECORD> upsertListener;
 
     /*
@@ -127,11 +129,14 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
         try {
             conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf);
 
-            final String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY);
+            final String tableNamesConf = conf.get(PHYSICAL_TABLE_NAMES_CONFKEY);
             final String logicalNamesConf = conf.get(LOGICAL_NAMES_CONFKEY);
-            tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
-            logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
-
+            physicalTableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
+            List<String> logicalTableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
+            for (String logicalTableName : logicalTableNames) {
+                PTable table = PhoenixRuntime.getTable(conn, logicalTableName);
+                logicalTables.add(new Pair<>(logicalTableName, SchemaUtil.getEmptyKeyValueInfo(table).getFirst()));
+            }
             columnIndexes = initColumnIndexes();
         } catch (SQLException | ClassNotFoundException e) {
             throw new RuntimeException(e);
@@ -143,7 +148,6 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
         preUpdateProcessor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
     }
 
-    @SuppressWarnings("deprecation")
     @Override
     protected void map(LongWritable key, Text value, Context context) throws IOException,
             InterruptedException {
@@ -173,8 +177,8 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
                 keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList);
                 byte[] first = kvPair.getFirst();
                 // Create a list of KV for each table
-                for (int i = 0; i < tableNames.size(); i++) {
-                    if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), first) == 0) {
+                for (int i = 0; i < physicalTableNames.size(); i++) {
+                    if (Bytes.compareTo(Bytes.toBytes(physicalTableNames.get(i)), first) == 0) {
                         if (!map.containsKey(i)) {
                             map.put(i, new ArrayList<KeyValue>());
                         }
@@ -201,8 +205,8 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
     private List<Map<byte[], Map<byte[], Integer>>> initColumnIndexes() throws SQLException {
         List<Map<byte[], Map<byte[], Integer>>> tableMap = new ArrayList<>();
         int tableIndex;
-        for (tableIndex = 0; tableIndex < tableNames.size(); tableIndex++) {
-            PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(tableIndex));
+        for (tableIndex = 0; tableIndex < physicalTableNames.size(); tableIndex++) {
+            PTable table = PhoenixRuntime.getTable(conn, logicalTables.get(tableIndex).getFirst());
             Map<byte[], Map<byte[], Integer>> columnMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
             List<PColumn> cls = table.getColumns();
             for (int i = 0; i < cls.size(); i++) {
@@ -247,7 +251,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
      * Collect all column values for the same rowKey
      *
      * @param context    Current mapper context
-     * @param tableIndex Table index in tableNames list
+     * @param tableIndex Table index in tableNames or logicalTables list
      * @param lkv        List of KV values that will be combined in a single ImmutableBytesWritable
      * @throws IOException
      * @throws InterruptedException
@@ -258,12 +262,14 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
         ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
         DataOutputStream outputStream = new DataOutputStream(bos);
         ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
+        String tableName = physicalTableNames.get(tableIndex);
+        Pair<String, byte[]> logicalTable = logicalTables.get(tableIndex);
         if (!lkv.isEmpty()) {
             // All Key Values for the same row are supposed to be the same, so init rowKey only once
             Cell first = lkv.get(0);
             outputKey.set(first.getRowArray(), first.getRowOffset(), first.getRowLength());
             for (KeyValue cell : lkv) {
-                if (isEmptyCell(cell)) {
+                if (isEmptyCell(cell, logicalTable.getSecond())) {
                     continue;
                 }
                 int i = findIndex(tableIndex, cell);
@@ -278,13 +284,13 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
         }
         ImmutableBytesWritable aggregatedArray = new ImmutableBytesWritable(bos.toByteArray());
         outputStream.close();
-        context.write(new TableRowkeyPair(tableNames.get(tableIndex), outputKey), aggregatedArray);
+        context.write(new TableRowkeyPair(tableName, outputKey), aggregatedArray);
     }
 
-    protected boolean isEmptyCell(KeyValue cell) {
+    protected boolean isEmptyCell(KeyValue cell, byte[] emptyKVQualifier) {
         if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(),
-                cell.getQualifierLength(), QueryConstants.EMPTY_COLUMN_BYTES, 0,
-                QueryConstants.EMPTY_COLUMN_BYTES.length) != 0)
+                cell.getQualifierLength(), emptyKVQualifier, 0,
+                emptyKVQualifier.length) != 0)
             return false;
         else
             return true;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
index e906431..04fe959 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
@@ -80,7 +80,7 @@ public class FormatToKeyValueReducer
         try {
             PhoenixConnection conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf);
             builder = conn.getKeyValueBuilder();
-            final String tableNamesConf = conf.get(FormatToBytesWritableMapper.TABLE_NAMES_CONFKEY);
+            final String tableNamesConf = conf.get(FormatToBytesWritableMapper.PHYSICAL_TABLE_NAMES_CONFKEY);
             final String logicalNamesConf = conf.get(FormatToBytesWritableMapper.LOGICAL_NAMES_CONFKEY);
             tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
             logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
@@ -98,16 +98,16 @@ public class FormatToKeyValueReducer
             PTable table = PhoenixRuntime.getTable(conn, tableName);
             emptyFamilyName.add(SchemaUtil.getEmptyColumnFamilyPtr(table));
             List<PColumn> cls = table.getColumns();
-            List<Pair<byte[], byte[]>> list = new ArrayList(cls.size());
+            List<Pair<byte[], byte[]>> list = new ArrayList<>(cls.size());
             for (int i = 0; i < cls.size(); i++) {
                 PColumn c = cls.get(i);
-                if (c.getFamilyName() == null) {
+                if (SchemaUtil.isPKColumn(c)) {
                     list.add(null); // Skip PK column
                     continue;
                 }
                 byte[] family = c.getFamilyName().getBytes();
-                byte[] name = c.getName().getBytes();
-                list.add(new Pair(family, name));
+                byte[] name = SchemaUtil.getColumnQualifier(c, table);
+                list.add(new Pair<>(family, name));
             }
             columnIndexes.add(list);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index a8595e9..a9f1e87 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -569,6 +569,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         }
     }
     
+    @Override
     public PMetaData updateResolvedTimestamp(PTable table, long resolvedTime) throws SQLException {
     	synchronized (latestMetaDataLock) {
             throwConnectionClosedIfNullMetaData();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 471ac73..adbbdfe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -29,6 +29,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CURRENT_VALUE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CYCLE_FLAG;
@@ -82,6 +83,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SOURCE_DATA_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SQL_DATA_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SQL_DATETIME_SUB;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_WITH;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORAGE_SCHEME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
@@ -115,6 +117,7 @@ import org.apache.phoenix.schema.MetaDataSplitPolicy;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.ByteUtil;
 
 
@@ -176,11 +179,18 @@ public interface QueryConstants {
     public static final byte[] EMPTY_COLUMN_BYTES = Bytes.toBytes(EMPTY_COLUMN_NAME);
     public static final ImmutableBytesPtr EMPTY_COLUMN_BYTES_PTR = new ImmutableBytesPtr(
             EMPTY_COLUMN_BYTES);
+    public static final Integer ENCODED_EMPTY_COLUMN_NAME = 0;
+    public static final byte[] ENCODED_EMPTY_COLUMN_BYTES = PInteger.INSTANCE.toBytes(ENCODED_EMPTY_COLUMN_NAME);
+    public static final ImmutableBytesPtr ENCODED_EMPTY_COLUMN_BYTES_PTR = new ImmutableBytesPtr(
+            ENCODED_EMPTY_COLUMN_BYTES);
     public final static String EMPTY_COLUMN_VALUE = "x";
     public final static byte[] EMPTY_COLUMN_VALUE_BYTES = Bytes.toBytes(EMPTY_COLUMN_VALUE);
     public static final ImmutableBytesPtr EMPTY_COLUMN_VALUE_BYTES_PTR = new ImmutableBytesPtr(
             EMPTY_COLUMN_VALUE_BYTES);
-
+    public static final String ENCODED_EMPTY_COLUMN_VALUE = EMPTY_COLUMN_VALUE;
+    public final static byte[] ENCODED_EMPTY_COLUMN_VALUE_BYTES = Bytes.toBytes(EMPTY_COLUMN_VALUE);
+    public static final ImmutableBytesPtr ENCODED_EMPTY_COLUMN_VALUE_BYTES_PTR = new ImmutableBytesPtr(
+            ENCODED_EMPTY_COLUMN_VALUE_BYTES);
     public static final String DEFAULT_COLUMN_FAMILY = "0";
     public static final byte[] DEFAULT_COLUMN_FAMILY_BYTES = Bytes.toBytes(DEFAULT_COLUMN_FAMILY);
     public static final ImmutableBytesPtr DEFAULT_COLUMN_FAMILY_BYTES_PTR = new ImmutableBytesPtr(
@@ -259,6 +269,8 @@ public interface QueryConstants {
             IS_ROW_TIMESTAMP + " BOOLEAN, " +
             TRANSACTIONAL + " BOOLEAN," +
             UPDATE_CACHE_FREQUENCY + " BIGINT," +
+            COLUMN_QUALIFIER + " INTEGER," +
+            STORAGE_SCHEME + " TINYINT, " + 
             "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + ","
             + TABLE_SCHEM + "," + TABLE_NAME + "," + COLUMN_NAME + "," + COLUMN_FAMILY + "))\n" +
             HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" +

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
index 76f6218..d7c6456 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
@@ -45,7 +45,7 @@ public class ColumnRef {
     }
 
     public ColumnRef(TableRef tableRef, String familyName, String columnName) throws MetaDataEntityNotFoundException {
-        this(tableRef, tableRef.getTable().getColumnFamily(familyName).getColumn(columnName).getPosition());
+        this(tableRef, tableRef.getTable().getColumnFamily(familyName).getPColumnForColumnName(columnName).getPosition());
     }
 
     public ColumnRef(TableRef tableRef, int columnPosition) {
@@ -109,7 +109,7 @@ public class ColumnRef {
         	return new ProjectedColumnExpression(column, table, displayName);
         }
        
-        return new KeyValueColumnExpression(column, displayName);
+        return new KeyValueColumnExpression(column, displayName, SchemaUtil.usesEncodedColumnNames(table));
     }
 
     public ColumnRef cloneAtTimestamp(long timestamp) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
index a60229e..65e362c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java
@@ -90,4 +90,9 @@ public class DelegateColumn extends DelegateDatum implements PColumn {
 	public boolean isDynamic() {
 		return getDelegate().isDynamic();
 	}
+
+    @Override
+    public Integer getColumnQualifier() {
+        return getDelegate().getColumnQualifier();
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index b294f03..de72c48 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -97,8 +97,8 @@ public class DelegateTable implements PTable {
     }
 
     @Override
-    public PColumn getColumn(String name) throws ColumnNotFoundException, AmbiguousColumnException {
-        return delegate.getColumn(name);
+    public PColumn getPColumnForColumnName(String name) throws ColumnNotFoundException, AmbiguousColumnException {
+        return delegate.getPColumnForColumnName(name);
     }
 
     @Override
@@ -271,4 +271,14 @@ public class DelegateTable implements PTable {
     public long getUpdateCacheFrequency() {
         return delegate.getUpdateCacheFrequency();
     }
+
+    @Override
+    public StorageScheme getStorageScheme() {
+        return delegate.getStorageScheme();
+    }
+
+    @Override
+    public PColumn getPColumnForColumnQualifier(byte[] cq) throws ColumnNotFoundException, AmbiguousColumnException {
+        return delegate.getPColumnForColumnQualifier(cq);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 7f3f850..455e2b1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -33,6 +33,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE;
@@ -65,6 +66,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORAGE_SCHEME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
@@ -180,6 +182,7 @@ import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTable.LinkType;
+import org.apache.phoenix.schema.PTable.StorageScheme;
 import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.stats.PTableStats;
 import org.apache.phoenix.schema.types.PDataType;
@@ -248,8 +251,9 @@ public class MetaDataClient {
             STORE_NULLS + "," +
             BASE_COLUMN_COUNT + "," +
             TRANSACTIONAL + "," +
-            UPDATE_CACHE_FREQUENCY +
-            ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+            UPDATE_CACHE_FREQUENCY + ", " +
+            STORAGE_SCHEME +
+            ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
     private static final String CREATE_LINK =
             "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
             TENANT_ID + "," +
@@ -321,8 +325,9 @@ public class MetaDataClient {
         PK_NAME + "," +  // write this both in the column and table rows for access by metadata APIs
         KEY_SEQ + "," +
         COLUMN_DEF + "," +
-        IS_ROW_TIMESTAMP + 
-        ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+        COLUMN_QUALIFIER + "," +
+        IS_ROW_TIMESTAMP +
+        ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
     private static final String INSERT_COLUMN_ALTER_TABLE =
             "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " +
             TENANT_ID + "," +
@@ -342,8 +347,9 @@ public class MetaDataClient {
             IS_VIEW_REFERENCED + "," +
             PK_NAME + "," +  // write this both in the column and table rows for access by metadata APIs
             KEY_SEQ + "," +
-            COLUMN_DEF +
-            ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+            COLUMN_DEF + ", " +
+            COLUMN_QUALIFIER +
+            ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
     private static final String UPDATE_COLUMN_POSITION =
         "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\" ( " +
         TENANT_ID + "," +
@@ -670,10 +676,10 @@ public class MetaDataClient {
                     byte[] cf= colRef.getFamily();
                     byte[] cq= colRef.getQualifier();
                     if (cf!=null) {
-                        view.getColumnFamily(cf).getColumn(cq);
+                        view.getColumnFamily(cf).getPColumnForColumnQualifier(cq);
                     }
                     else {
-                        view.getColumn( Bytes.toString(cq));
+                        view.getPColumnForColumnQualifier(cq);
                     }
                 } catch (ColumnNotFoundException e) { // Ignore this index and continue with others
                     containsAllReqdCols = false;
@@ -689,7 +695,7 @@ public class MetaDataClient {
                         // but the WHERE clause for the view statement (which is added to the index below)
                         // would fail to compile.
                         String indexColumnName = IndexUtil.getIndexColumnName(col);
-                        index.getColumn(indexColumnName);
+                        index.getPColumnForColumnName(indexColumnName);
                     } catch (ColumnNotFoundException e) { // Ignore this index and continue with others
                         containsAllReqdCols = false;
                         break;
@@ -747,8 +753,13 @@ public class MetaDataClient {
         } else {
             colUpsert.setString(18, column.getExpressionStr());
         }
-        if (colUpsert.getParameterMetaData().getParameterCount() > 18) {
-            colUpsert.setBoolean(19, column.isRowTimestamp());
+        if (column.getColumnQualifier() == null) {
+            colUpsert.setNull(19, Types.INTEGER);
+        } else {
+            colUpsert.setInt(19, column.getColumnQualifier());
+        }
+        if (colUpsert.getParameterMetaData().getParameterCount() > 19) {
+            colUpsert.setBoolean(20, column.isRowTimestamp());
         }
         colUpsert.execute();
     }
@@ -767,7 +778,7 @@ public class MetaDataClient {
         argUpsert.execute();
     }
 
-    private PColumn newColumn(int position, ColumnDef def, PrimaryKeyConstraint pkConstraint, String defaultColumnFamily, boolean addingToPK) throws SQLException {
+    private PColumn newColumn(int position, ColumnDef def, PrimaryKeyConstraint pkConstraint, String defaultColumnFamily, boolean addingToPK, Map<String, Integer> nextColumnQualifiers) throws SQLException {
         try {
             ColumnName columnDefName = def.getColumnDefName();
             SortOrder sortOrder = def.getSortOrder();
@@ -815,15 +826,23 @@ public class MetaDataClient {
                 }
                 isNull = false;
             }
-
+            Integer columnQualifier = null;
+            if (!isPK && nextColumnQualifiers != null) {
+                columnQualifier = nextColumnQualifiers.get(familyName.getString());
+                if (columnQualifier == null) {
+                    // We use columnQualifier 0 for the special empty key value.
+                    columnQualifier = 1;
+                }
+                nextColumnQualifiers.put(familyName.toString(), columnQualifier + 1);
+            }
             PColumn column = new PColumnImpl(PNameFactory.newName(columnName), familyName, def.getDataType(),
-                    def.getMaxLength(), def.getScale(), isNull, position, sortOrder, def.getArraySize(), null, false, def.getExpression(), isRowTimestamp, false);
+                    def.getMaxLength(), def.getScale(), isNull, position, sortOrder, def.getArraySize(), null, false, def.getExpression(), isRowTimestamp, false, columnQualifier);
             return column;
         } catch (IllegalArgumentException e) { // Based on precondition check in constructor
             throw new SQLException(e);
         }
     }
-
+    
     public MutationState createTable(CreateTableStatement statement, byte[][] splits, PTable parent, String viewStatement, ViewType viewType, byte[][] viewColumnConstants, BitSet isViewColumnReferenced) throws SQLException {
         PTable table = createTableInternal(statement, splits, parent, viewStatement, viewType, viewColumnConstants, isViewColumnReferenced, null, null, null);
         if (table == null || table.getType() == PTableType.VIEW || table.isTransactional()) {
@@ -1683,7 +1702,7 @@ public class MetaDataClient {
                 .build().buildException();
             }
             // can't create a transactional table if it has a row timestamp column
-            if (pkConstraint.getNumColumnsWithRowTimestamp()>0 && transactional) {
+            if (pkConstraint.getNumColumnsWithRowTimestamp() > 0 && transactional) {
                 throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CREATE_TXN_TABLE_WITH_ROW_TIMESTAMP)
                 .setSchemaName(schemaName).setTableName(tableName)
                 .build().buildException();
@@ -1843,6 +1862,39 @@ public class MetaDataClient {
             int pkPositionOffset = pkColumns.size();
             int position = positionOffset;
             
+            StorageScheme storageScheme = null;
+            Map<String, Integer> nextColumnQualifiers = null; // this would be null for tables created before phoenix 4.8.
+            if (SchemaUtil.isSystemTable(Bytes.toBytes(SchemaUtil.getTableName(schemaName, tableName)))) {
+                // System tables have hard-coded column qualifiers. So we can't use column encoding for them.
+                storageScheme = StorageScheme.NON_ENCODED_COLUMN_NAMES;
+            } else if (parent != null && tableType == PTableType.VIEW) {
+                // We can't control what column qualifiers are used in HTable mapped to Phoenix views. So
+                // we are not able to encode column names.
+                if (viewType != null && viewType == MAPPED) {
+                    storageScheme = StorageScheme.NON_ENCODED_COLUMN_NAMES;
+                } else {
+                    // for regular phoenix views, use the storage scheme of the parent since they all share the parent's
+                    // HTable.
+                    storageScheme = parent.getStorageScheme();
+                    if (storageScheme == StorageScheme.ENCODED_COLUMN_NAMES) {
+                        nextColumnQualifiers  = SchemaUtil.getNextColumnQualifiers(parent);
+                    }
+                }
+            } else if (parent != null && tableType == PTableType.INDEX) {
+                // New indexes on existing tables can have encoded column names. But unfortunately, due to 
+                // backward compatibility reasons, we aren't able to change IndexMaintainer and the state
+                // that is serialized in it. Because of this we are forced to have the indexes inherit the
+                // storage scheme of the parent data tables.
+                storageScheme = parent.getStorageScheme();
+                if (storageScheme == StorageScheme.ENCODED_COLUMN_NAMES) {
+                    nextColumnQualifiers  = SchemaUtil.getNextColumnQualifiers(parent);
+                }
+            } else {
+                // we always attempt to create tables with encoded column names.
+                storageScheme = StorageScheme.ENCODED_COLUMN_NAMES;
+                nextColumnQualifiers = Maps.newHashMapWithExpectedSize(colDefs.size() - pkColumns.size());
+            }
+            
             for (ColumnDef colDef : colDefs) {
                 rowTimeStampColumnAlreadyFound = checkAndValidateRowTimestampCol(colDef, pkConstraint, rowTimeStampColumnAlreadyFound, tableType);
                 if (colDef.isPK()) { // i.e. the column is declared as CREATE TABLE COLNAME DATATYPE PRIMARY KEY...
@@ -1861,7 +1913,7 @@ public class MetaDataClient {
                                 .setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException();
                     }
                 }
-                PColumn column = newColumn(position++, colDef, pkConstraint, defaultFamilyName, false);
+                PColumn column = newColumn(position++, colDef, pkConstraint, defaultFamilyName, false, nextColumnQualifiers);
                 if (SchemaUtil.isPKColumn(column)) {
                     // TODO: remove this constraint?
                     if (pkColumnsIterator.hasNext() && !column.getName().getString().equals(pkColumnsIterator.next().getFirst().getColumnName())) {
@@ -1972,13 +2024,14 @@ public class MetaDataClient {
             if (SchemaUtil.isMetaTable(schemaName,tableName)) {
                 // TODO: what about stats for system catalog?
                 PName newSchemaName = PNameFactory.newName(schemaName);
+                // Column names and qualifiers and hardcoded for system tables.
                 PTable table = PTableImpl.makePTable(tenantId,newSchemaName, PNameFactory.newName(tableName), tableType,
                         null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM,
                         PNameFactory.newName(QueryConstants.SYSTEM_TABLE_PK_NAME), null, columns, null, null,
                         Collections.<PTable>emptyList(), isImmutableRows,
                         Collections.<PName>emptyList(), defaultFamilyName == null ? null :
                                 PNameFactory.newName(defaultFamilyName), null,
-                        Boolean.TRUE.equals(disableWAL), false, false, null, indexId, indexType, true, false, 0, 0L);
+                        Boolean.TRUE.equals(disableWAL), false, false, null, indexId, indexType, true, false, 0, 0L, StorageScheme.NON_ENCODED_COLUMN_NAMES);
                 connection.addTable(table, MetaDataProtocol.MIN_TABLE_TIMESTAMP);
             } else if (tableType == PTableType.INDEX && indexId == null) {
                 if (tableProps.get(HTableDescriptor.MAX_FILESIZE) == null) {
@@ -2038,7 +2091,6 @@ public class MetaDataClient {
                 tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
                 connection.rollback();
             }
-
             String dataTableName = parent == null || tableType == PTableType.VIEW ? null : parent.getTableName().getString();
             PIndexState indexState = parent == null || tableType == PTableType.VIEW  ? null : PIndexState.BUILDING;
             PreparedStatement tableUpsert = connection.prepareStatement(CREATE_TABLE);
@@ -2084,6 +2136,7 @@ public class MetaDataClient {
             }
             tableUpsert.setBoolean(21, transactional);
             tableUpsert.setLong(22, updateCacheFrequency);
+            tableUpsert.setByte(23, storageScheme.getSerializedValue()); //TODO: samarth should there be a null check here?
             tableUpsert.execute();
 
             if (asyncCreatedDate != null) {
@@ -2148,7 +2201,7 @@ public class MetaDataClient {
                         PTable.INITIAL_SEQ_NUM, pkName == null ? null : PNameFactory.newName(pkName), saltBucketNum, columns,
                         dataTableName == null ? null : newSchemaName, dataTableName == null ? null : PNameFactory.newName(dataTableName), Collections.<PTable>emptyList(), isImmutableRows,
                         physicalNames, defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), viewStatement, Boolean.TRUE.equals(disableWAL), multiTenant, storeNulls, viewType,
-                        indexId, indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency, 0L);
+                        indexId, indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency, 0L, storageScheme);
                 result = new MetaDataMutationResult(code, result.getMutationTime(), table, true);
                 addTableToCache(result);
                 return table;
@@ -2701,6 +2754,7 @@ public class MetaDataClient {
                 List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnDefs.size());
                 Set<String> colFamiliesForPColumnsToBeAdded = new LinkedHashSet<>();
                 Set<String> families = new LinkedHashSet<>();
+                Map<String, Integer> nextColumnQualifiers = SchemaUtil.getNextColumnQualifiers(table);
                 if (columnDefs.size() > 0 ) {
                     try (PreparedStatement colUpsert = connection.prepareStatement(INSERT_COLUMN_ALTER_TABLE)) {
                         short nextKeySeq = SchemaUtil.getMaxKeySeq(table);
@@ -2721,7 +2775,7 @@ public class MetaDataClient {
                                 throw new SQLExceptionInfo.Builder(SQLExceptionCode.ROWTIMESTAMP_CREATE_ONLY)
                                 .setColumnName(colDef.getColumnDefName().getColumnName()).build().buildException();
                             }
-                            PColumn column = newColumn(position++, colDef, PrimaryKeyConstraint.EMPTY, table.getDefaultFamilyName() == null ? null : table.getDefaultFamilyName().getString(), true);
+                            PColumn column = newColumn(position++, colDef, PrimaryKeyConstraint.EMPTY, table.getDefaultFamilyName() == null ? null : table.getDefaultFamilyName().getString(), true, nextColumnQualifiers);
                             columns.add(column);
                             String pkName = null;
                             Short keySeq = null;
@@ -2759,7 +2813,7 @@ public class MetaDataClient {
                                         ColumnName indexColName = ColumnName.caseSensitiveColumnName(IndexUtil.getIndexColumnName(null, colDef.getColumnDefName().getColumnName()));
                                         Expression expression = new RowKeyColumnExpression(columns.get(i), new RowKeyValueAccessor(pkColumns, ++pkSlotPosition));
                                         ColumnDef indexColDef = FACTORY.columnDef(indexColName, indexColDataType.getSqlTypeName(), colDef.isNull(), colDef.getMaxLength(), colDef.getScale(), true, colDef.getSortOrder(), expression.toString(), colDef.isRowTimestamp());
-                                        PColumn indexColumn = newColumn(indexPosition++, indexColDef, PrimaryKeyConstraint.EMPTY, null, true);
+                                        PColumn indexColumn = newColumn(indexPosition++, indexColDef, PrimaryKeyConstraint.EMPTY, null, true, nextColumnQualifiers);
                                         addColumnMutation(schemaName, index.getTableName().getString(), indexColumn, colUpsert, index.getParentTableName().getString(), index.getPKName() == null ? null : index.getPKName().getString(), ++nextIndexKeySeq, index.getBucketNum() != null);
                                     }
                                 }
@@ -3034,7 +3088,7 @@ public class MetaDataClient {
                         } 
                         else if (coveredColumns.contains(columnToDropRef)) {
                             String indexColumnName = IndexUtil.getIndexColumnName(columnToDrop);
-                            indexColumnsToDrop.add(index.getColumn(indexColumnName));
+                            indexColumnsToDrop.add(index.getPColumnForColumnName(indexColumnName));
                         }
                     }
                     if(!indexColumnsToDrop.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
index 0f5fa44..b63c97b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java
@@ -27,7 +27,7 @@ package org.apache.phoenix.schema;
 public interface PColumn extends PDatum {
 
     /**
-     * @return the name of the column qualifier
+     * @return the name of the column
      */
     PName getName();
 
@@ -60,4 +60,9 @@ public interface PColumn extends PDatum {
     boolean isRowTimestamp();
     
     boolean isDynamic();
+    
+    /**
+     * @return name of the HBase column qualifier
+     */
+    Integer getColumnQualifier();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java
index 24da14d..c4c383e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamily.java
@@ -39,16 +39,22 @@ public interface PColumnFamily {
     Collection<PColumn> getColumns();
     
     /**
-     * @return The PColumn for the specified column qualifier.
+     * @return The PColumn for the specified column name.
      * @throws ColumnNotFoundException if the column cannot be found
      */
-    PColumn getColumn(byte[] qualifier) throws ColumnNotFoundException;
+    PColumn getPColumnForColumnNameBytes(byte[] columnNameBytes) throws ColumnNotFoundException;
     
     /**
-     * @return The PColumn for the specified column qualifier.
+     * @return The PColumn for the specified column name.
      * @throws ColumnNotFoundException if the column cannot be found
      */
-    PColumn getColumn(String name) throws ColumnNotFoundException;
+    PColumn getPColumnForColumnName(String columnName) throws ColumnNotFoundException;
     
     int getEstimatedSize();
+    
+    /**
+     * @return The PColumn for the specified column qualifier.
+     * @throws ColumnNotFoundException if the column cannot be found
+     */
+    PColumn getPColumnForColumnQualifier(byte[] cq) throws ColumnNotFoundException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
index 2e29656..a3d855b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnFamilyImpl.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.SizedUtil;
 
 import com.google.common.base.Preconditions;
@@ -31,8 +32,9 @@ import com.google.common.collect.ImmutableSortedMap;
 public class PColumnFamilyImpl implements PColumnFamily {
     private final PName name;
     private final List<PColumn> columns;
-    private final Map<String, PColumn> columnByString;
-    private final Map<byte[], PColumn> columnByBytes;
+    private final Map<String, PColumn> columnNamesByStrings;
+    private final Map<byte[], PColumn> columnNamesByBytes;
+    private final Map<byte[], PColumn> columnQualifiersByBytes;
     private final int estimatedSize;
 
     @Override
@@ -40,22 +42,28 @@ public class PColumnFamilyImpl implements PColumnFamily {
         return estimatedSize;
     }
     
-    public PColumnFamilyImpl(PName name, List<PColumn> columns) {
+    public PColumnFamilyImpl(PName name, List<PColumn> columns, boolean useEncodedColumnNames) {
         Preconditions.checkNotNull(name);
         // Include guidePosts also in estimating the size
         long estimatedSize = SizedUtil.OBJECT_SIZE + SizedUtil.POINTER_SIZE * 5 + SizedUtil.INT_SIZE + name.getEstimatedSize() +
                 SizedUtil.sizeOfMap(columns.size()) * 2 + SizedUtil.sizeOfArrayList(columns.size());
         this.name = name;
         this.columns = ImmutableList.copyOf(columns);
-        ImmutableMap.Builder<String, PColumn> columnByStringBuilder = ImmutableMap.builder();
-        ImmutableSortedMap.Builder<byte[], PColumn> columnByBytesBuilder = ImmutableSortedMap.orderedBy(Bytes.BYTES_COMPARATOR);
+        ImmutableMap.Builder<String, PColumn> columnNamesByStringBuilder = ImmutableMap.builder();
+        ImmutableSortedMap.Builder<byte[], PColumn> columnNamesByBytesBuilder = ImmutableSortedMap.orderedBy(Bytes.BYTES_COMPARATOR);
+        ImmutableSortedMap.Builder<byte[], PColumn> columnQualifiersByBytesBuilder = ImmutableSortedMap.orderedBy(Bytes.BYTES_COMPARATOR);
         for (PColumn column : columns) {
             estimatedSize += column.getEstimatedSize();
-            columnByBytesBuilder.put(column.getName().getBytes(), column);
-            columnByStringBuilder.put(column.getName().getString(), column);
+            columnNamesByBytesBuilder.put(column.getName().getBytes(), column);
+            columnNamesByStringBuilder.put(column.getName().getString(), column);
+            // TODO: samarth fix this. projected columns have column family for pk columns which messes up with checks.
+            if (!useEncodedColumnNames || (useEncodedColumnNames && !(column.getColumnQualifier() == null))) {
+                columnQualifiersByBytesBuilder.put(SchemaUtil.getColumnQualifier(column, useEncodedColumnNames), column);
+            }
         }
-        this.columnByBytes = columnByBytesBuilder.build();
-        this.columnByString = columnByStringBuilder.build();
+        this.columnNamesByBytes = columnNamesByBytesBuilder.build();
+        this.columnNamesByStrings = columnNamesByStringBuilder.build();
+        this.columnQualifiersByBytes = useEncodedColumnNames ? columnQualifiersByBytesBuilder.build() : columnNamesByBytes;
         this.estimatedSize = (int)estimatedSize;
     }
     
@@ -70,19 +78,28 @@ public class PColumnFamilyImpl implements PColumnFamily {
     }
 
     @Override
-    public PColumn getColumn(byte[] qualifier) throws ColumnNotFoundException  {
-        PColumn column = columnByBytes.get(qualifier);
+    public PColumn getPColumnForColumnNameBytes(byte[] columnNameBytes) throws ColumnNotFoundException  {
+        PColumn column = columnNamesByBytes.get(columnNameBytes);
         if (column == null) {
-            throw new ColumnNotFoundException(Bytes.toString(qualifier));
+            throw new ColumnNotFoundException(Bytes.toString(columnNameBytes));
         }
         return column;
     }
     
     @Override
-    public PColumn getColumn(String name) throws ColumnNotFoundException  {
-        PColumn column = columnByString.get(name);
+    public PColumn getPColumnForColumnName(String columnName) throws ColumnNotFoundException  {
+        PColumn column = columnNamesByStrings.get(columnName);
         if (column == null) {
-            throw new ColumnNotFoundException(name);
+            throw new ColumnNotFoundException(columnName);
+        }
+        return column;
+    }
+    
+    @Override
+    public PColumn getPColumnForColumnQualifier(byte[] cq) throws ColumnNotFoundException {
+        PColumn column = columnQualifiersByBytes.get(cq);
+        if (column == null) {
+            throw new ColumnNotFoundException(Bytes.toString(cq));
         }
         return column;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
index a556f76..5cf1465 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java
@@ -40,6 +40,7 @@ public class PColumnImpl implements PColumn {
     private String expressionStr;
     private boolean isRowTimestamp;
     private boolean isDynamic;
+    private Integer columnQualifier;
     
     public PColumnImpl() {
     }
@@ -51,13 +52,13 @@ public class PColumnImpl implements PColumn {
                        Integer scale,
                        boolean nullable,
                        int position,
-                       SortOrder sortOrder, Integer arrSize, byte[] viewConstant, boolean isViewReferenced, String expressionStr, boolean isRowTimestamp, boolean isDynamic) {
-        init(name, familyName, dataType, maxLength, scale, nullable, position, sortOrder, arrSize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, isDynamic);
+                       SortOrder sortOrder, Integer arrSize, byte[] viewConstant, boolean isViewReferenced, String expressionStr, boolean isRowTimestamp, boolean isDynamic, Integer columnQualifier) {
+        init(name, familyName, dataType, maxLength, scale, nullable, position, sortOrder, arrSize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, isDynamic, columnQualifier);
     }
 
     public PColumnImpl(PColumn column, int position) {
         this(column.getName(), column.getFamilyName(), column.getDataType(), column.getMaxLength(),
-                column.getScale(), column.isNullable(), position, column.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp(), column.isDynamic());
+                column.getScale(), column.isNullable(), position, column.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp(), column.isDynamic(), column.getColumnQualifier());
     }
 
     private void init(PName name,
@@ -69,7 +70,7 @@ public class PColumnImpl implements PColumn {
             int position,
             SortOrder sortOrder,
             Integer arrSize,
-            byte[] viewConstant, boolean isViewReferenced, String expressionStr, boolean isRowTimestamp, boolean isDynamic) {
+            byte[] viewConstant, boolean isViewReferenced, String expressionStr, boolean isRowTimestamp, boolean isDynamic, Integer columnQualifier) {
     	Preconditions.checkNotNull(sortOrder);
         this.dataType = dataType;
         if (familyName == null) {
@@ -94,6 +95,7 @@ public class PColumnImpl implements PColumn {
         this.expressionStr = expressionStr;
         this.isRowTimestamp = isRowTimestamp;
         this.isDynamic = isDynamic;
+        this.columnQualifier = columnQualifier;
     }
 
     @Override
@@ -205,6 +207,11 @@ public class PColumnImpl implements PColumn {
     public boolean isDynamic() {
         return isDynamic;
     }
+    
+    @Override
+    public Integer getColumnQualifier() {
+        return columnQualifier;
+    }
 
     /**
      * Create a PColumn instance from PBed PColumn instance
@@ -251,8 +258,12 @@ public class PColumnImpl implements PColumn {
         if (column.hasIsDynamic()) {
         	isDynamic = column.getIsDynamic();
         }
+        Integer columnQualifier = null;
+        if (column.hasColumnQualifier()) {
+            columnQualifier = column.getColumnQualifier();
+        }
         return new PColumnImpl(columnName, familyName, dataType, maxLength, scale, nullable, position, sortOrder,
-                arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, isDynamic);
+                arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, isDynamic, columnQualifier);
     }
 
     public static PTableProtos.PColumn toProto(PColumn column) {
@@ -283,6 +294,9 @@ public class PColumnImpl implements PColumn {
             builder.setExpression(column.getExpressionStr());
         }
         builder.setIsRowTimestamp(column.isRowTimestamp());
+        if (column.getColumnQualifier() != null) {
+            builder.setColumnQualifier(column.getColumnQualifier());
+        }
         return builder.build();
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
index 413d116..8c85ae5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java
@@ -399,7 +399,7 @@ public class PMetaDataImpl implements PMetaData {
             if (familyName == null) {
                 column = table.getPKColumn(columnToRemove.getName().getString());
             } else {
-                column = table.getColumnFamily(familyName).getColumn(columnToRemove.getName().getString());
+                column = table.getColumnFamily(familyName).getPColumnForColumnName(columnToRemove.getName().getString());
             }
             int positionOffset = 0;
             int position = column.getPosition();
@@ -414,7 +414,7 @@ public class PMetaDataImpl implements PMetaData {
             // Update position of columns that follow removed column
             for (int i = position+1; i < oldColumns.size(); i++) {
                 PColumn oldColumn = oldColumns.get(i);
-                PColumn newColumn = new PColumnImpl(oldColumn.getName(), oldColumn.getFamilyName(), oldColumn.getDataType(), oldColumn.getMaxLength(), oldColumn.getScale(), oldColumn.isNullable(), i-1+positionOffset, oldColumn.getSortOrder(), oldColumn.getArraySize(), oldColumn.getViewConstant(), oldColumn.isViewReferenced(), null, oldColumn.isRowTimestamp(), oldColumn.isDynamic());
+                PColumn newColumn = new PColumnImpl(oldColumn.getName(), oldColumn.getFamilyName(), oldColumn.getDataType(), oldColumn.getMaxLength(), oldColumn.getScale(), oldColumn.isNullable(), i-1+positionOffset, oldColumn.getSortOrder(), oldColumn.getArraySize(), oldColumn.getViewConstant(), oldColumn.isViewReferenced(), null, oldColumn.isRowTimestamp(), oldColumn.isDynamic(), oldColumn.getColumnQualifier());
                 columns.add(newColumn);
             }
             

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/schema/PName.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PName.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PName.java
index 0e1337c..8df6a95 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PName.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PName.java
@@ -83,6 +83,32 @@ public interface PName {
             return 0;
         }
     };
+    public static PName ENCODED_EMPTY_COLUMN_NAME = new PName() {
+        @Override
+        public String getString() {
+            return String.valueOf(QueryConstants.ENCODED_EMPTY_COLUMN_NAME);
+        }
+
+        @Override
+        public byte[] getBytes() {
+            return QueryConstants.ENCODED_EMPTY_COLUMN_BYTES;
+        }
+        
+        @Override
+        public String toString() {
+            return getString();
+        }
+
+        @Override
+        public ImmutableBytesPtr getBytesPtr() {
+            return QueryConstants.ENCODED_EMPTY_COLUMN_BYTES_PTR;
+        }
+
+        @Override
+        public int getEstimatedSize() {
+            return 0;
+        }
+    };
     /**
      * Get the client-side, normalized name as referenced
      * in a SQL statement.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42294c4b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index b2a1d58..662f114 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -154,6 +154,34 @@ public interface PTable extends PMetaDataEntity {
             return LinkType.values()[serializedValue-1];
         }
     }
+    
+    public enum StorageScheme {
+        ENCODED_COLUMN_NAMES((byte)1),
+        NON_ENCODED_COLUMN_NAMES((byte)2);
+
+        private final byte[] byteValue;
+        private final byte serializedValue;
+
+        StorageScheme(byte serializedValue) {
+            this.serializedValue = serializedValue;
+            this.byteValue = Bytes.toBytes(this.name());
+        }
+
+        public byte[] getBytes() {
+            return byteValue;
+        }
+
+        public byte getSerializedValue() {
+            return this.serializedValue;
+        }
+
+        public static StorageScheme fromSerializedValue(byte serializedValue) {
+            if (serializedValue < 1 || serializedValue > StorageScheme.values().length) {
+                return null;
+            }
+            return StorageScheme.values()[serializedValue-1];
+        }
+    }
 
     long getTimeStamp();
     long getSequenceNumber();
@@ -209,7 +237,17 @@ public interface PTable extends PMetaDataEntity {
      * can be found
      * @throws AmbiguousColumnException if multiple columns are found with the given name
      */
-    PColumn getColumn(String name) throws ColumnNotFoundException, AmbiguousColumnException;
+    //TODO: samarth inspect all the callers of this method.
+    PColumn getPColumnForColumnName(String name) throws ColumnNotFoundException, AmbiguousColumnException;
+    
+    /**
+     * Get the column with the given column qualifier.
+     * @param column qualifier bytes
+     * @return the PColumn with the given column qualifier
+     * @throws ColumnNotFoundException if no column with the given column qualifier can be found
+     * @throws AmbiguousColumnException if multiple columns are found with the given column qualifier
+     */
+    PColumn getPColumnForColumnQualifier(byte[] cq) throws ColumnNotFoundException, AmbiguousColumnException; 
     
     /**
      * Get the PK column with the given name.
@@ -341,4 +379,5 @@ public interface PTable extends PMetaDataEntity {
      */
     int getRowTimestampColPos();
     long getUpdateCacheFrequency();
+    StorageScheme getStorageScheme();
 }