You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/07/20 19:15:03 UTC

[33/50] [abbrv] phoenix git commit: PHOENIX-2067 Sort order incorrect for variable length DESC columns

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 0956753..a12f633 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -59,14 +59,10 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.parse.AndParseNode;
-import org.apache.phoenix.parse.BaseParseNodeVisitor;
-import org.apache.phoenix.parse.BooleanParseNodeVisitor;
 import org.apache.phoenix.parse.FunctionParseNode;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.parse.StatelessTraverseAllParseNodeVisitor;
-import org.apache.phoenix.parse.TraverseAllParseNodeVisitor;
 import org.apache.phoenix.parse.UDFParseNode;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.ColumnNotFoundException;
@@ -265,6 +261,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
     private int[] dataPkPosition;
     private int maxTrailingNulls;
     private ColumnReference dataEmptyKeyValueRef;
+    private boolean rowKeyOrderOptimizable;
     
     private IndexMaintainer(RowKeySchema dataRowKeySchema, boolean isDataTableSalted) {
         this.dataRowKeySchema = dataRowKeySchema;
@@ -273,6 +270,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
 
     private IndexMaintainer(PTable dataTable, PTable index, PhoenixConnection connection) {
         this(dataTable.getRowKeySchema(), dataTable.getBucketNum() != null);
+        this.rowKeyOrderOptimizable = index.rowKeyOrderOptimizable();
         this.isMultiTenant = dataTable.isMultiTenant();
         this.viewIndexId = index.getViewIndexId() == null ? null : MetaDataUtil.getViewIndexIdDataType().toBytes(index.getViewIndexId());
         this.isLocalIndex = index.getIndexType() == IndexType.LOCAL;
@@ -434,7 +432,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                 dataRowKeySchema.next(ptr, dataPosOffset, maxRowKeyOffset);
                 output.write(ptr.get(), ptr.getOffset(), ptr.getLength());
                 if (!dataRowKeySchema.getField(dataPosOffset).getDataType().isFixedWidth()) {
-                    output.writeByte(QueryConstants.SEPARATOR_BYTE);
+                    output.writeByte(SchemaUtil.getSeparatorByte(rowKeyOrderOptimizable, ptr.getLength()==0, dataRowKeySchema.getField(dataPosOffset)));
                 }
                 dataPosOffset++;
             }
@@ -481,21 +479,22 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                 }
                 boolean isDataColumnInverted = dataSortOrder != SortOrder.ASC;
                 PDataType indexColumnType = IndexUtil.getIndexColumnDataType(isNullable, dataColumnType);
-                boolean isBytesComparable = dataColumnType.isBytesComparableWith(indexColumnType) ;
-                if (isBytesComparable && isDataColumnInverted == descIndexColumnBitSet.get(i)) {
+                boolean isBytesComparable = dataColumnType.isBytesComparableWith(indexColumnType);
+                boolean isIndexColumnDesc = descIndexColumnBitSet.get(i);
+                if (isBytesComparable && isDataColumnInverted == isIndexColumnDesc) {
                     output.write(ptr.get(), ptr.getOffset(), ptr.getLength());
                 } else {
                     if (!isBytesComparable)  {
                         indexColumnType.coerceBytes(ptr, dataColumnType, dataSortOrder, SortOrder.getDefault());
                     }
-                    if (descIndexColumnBitSet.get(i) != isDataColumnInverted) {
+                    if (isDataColumnInverted != isIndexColumnDesc) {
                         writeInverted(ptr.get(), ptr.getOffset(), ptr.getLength(), output);
                     } else {
                         output.write(ptr.get(), ptr.getOffset(), ptr.getLength());
                     }
                 }
                 if (!indexColumnType.isFixedWidth()) {
-                    output.writeByte(QueryConstants.SEPARATOR_BYTE);
+                    output.writeByte(SchemaUtil.getSeparatorByte(rowKeyOrderOptimizable, ptr.getLength() == 0, isIndexColumnDesc ? SortOrder.DESC : SortOrder.ASC));
                 }
             }
             int length = stream.size();
@@ -545,7 +544,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                 indexRowKeySchema.next(ptr, indexPosOffset, maxRowKeyOffset);
                 output.write(ptr.get(), ptr.getOffset(), ptr.getLength());
                 if (!dataRowKeySchema.getField(dataPosOffset).getDataType().isFixedWidth()) {
-                    output.writeByte(QueryConstants.SEPARATOR_BYTE);
+                    output.writeByte(SchemaUtil.getSeparatorByte(rowKeyOrderOptimizable, ptr.getLength() == 0, dataRowKeySchema.getField(dataPosOffset)));
                 }
                 indexPosOffset++;
                 dataPosOffset++;
@@ -587,8 +586,11 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                         }
                     }
                 }
-                if (!dataRowKeySchema.getField(i).getDataType().isFixedWidth() && ((i+1) !=  dataRowKeySchema.getFieldCount())) {
-                    output.writeByte(QueryConstants.SEPARATOR_BYTE);
+                // Write separator byte if variable length unless it's the last field in the schema
+                // (but we still need to write it if it's DESC to ensure sort order is correct).
+                byte sepByte = SchemaUtil.getSeparatorByte(rowKeyOrderOptimizable, ptr.getLength() == 0, dataRowKeySchema.getField(i));
+                if (!dataRowKeySchema.getField(i).getDataType().isFixedWidth() && (((i+1) !=  dataRowKeySchema.getFieldCount()) || sepByte == QueryConstants.DESC_SEPARATOR_BYTE)) {
+                    output.writeByte(sepByte);
                 }
             }
             int length = stream.size();
@@ -658,6 +660,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
     private RowKeySchema generateIndexRowKeySchema() {
         int nIndexedColumns = getIndexPkColumnCount() + (isMultiTenant ? 1 : 0) + (!isLocalIndex && nIndexSaltBuckets > 0 ? 1 : 0) + (viewIndexId != null ? 1 : 0) - getNumViewConstants();
         RowKeySchema.RowKeySchemaBuilder builder = new RowKeySchema.RowKeySchemaBuilder(nIndexedColumns);
+        builder.rowKeyOrderOptimizable(rowKeyOrderOptimizable);
         if (!isLocalIndex && nIndexSaltBuckets > 0) {
             builder.addField(SaltingUtil.SALTING_COLUMN, false, SortOrder.ASC);
             nIndexedColumns--;
@@ -708,44 +711,67 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             // same for all rows in this index)
             if (!viewConstantColumnBitSet.get(i)) {
                 int pos = rowKeyMetaData.getIndexPkPosition(i-dataPosOffset);
-                indexFields[pos] = dataRowKeySchema.getField(i);
+                Field dataField = dataRowKeySchema.getField(i);
+                indexFields[pos] = 
+                        dataRowKeySchema.getField(i);
             } 
         }
+        BitSet descIndexColumnBitSet = rowKeyMetaData.getDescIndexColumnBitSet();
         Iterator<Expression> expressionItr = indexedExpressions.iterator();
-        for (Field indexField : indexFields) {
-            if (indexField == null) { // Add field for kv column in index
-                final PDataType dataType = expressionItr.next().getDataType();
-                builder.addField(new PDatum() {
+        for (int i = 0; i < indexFields.length; i++) {
+            Field indexField = indexFields[i];
+            PDataType dataTypeToBe;
+            SortOrder sortOrderToBe;
+            boolean isNullableToBe;
+            Integer maxLengthToBe;
+            Integer scaleToBe;
+            if (indexField == null) {
+                Expression e = expressionItr.next();
+                isNullableToBe = true;
+                dataTypeToBe = IndexUtil.getIndexColumnDataType(isNullableToBe, e.getDataType());
+                sortOrderToBe = descIndexColumnBitSet.get(i) ? SortOrder.DESC : SortOrder.ASC;
+                maxLengthToBe = e.getMaxLength();
+                scaleToBe = e.getScale();
+            } else {
+                isNullableToBe = indexField.isNullable();
+                dataTypeToBe = IndexUtil.getIndexColumnDataType(isNullableToBe, indexField.getDataType());
+                sortOrderToBe = descIndexColumnBitSet.get(i) ? SortOrder.DESC : SortOrder.ASC;
+                maxLengthToBe = indexField.getMaxLength();
+                scaleToBe = indexField.getScale();
+            }
+            final PDataType dataType = dataTypeToBe;
+            final SortOrder sortOrder = sortOrderToBe;
+            final boolean isNullable = isNullableToBe;
+            final Integer maxLength = maxLengthToBe;
+            final Integer scale = scaleToBe;
+            builder.addField(new PDatum() {
 
-                    @Override
-                    public boolean isNullable() {
-                        return true;
-                    }
+                @Override
+                public boolean isNullable() {
+                    return isNullable;
+                }
 
-                    @Override
-                    public PDataType getDataType() {
-                        return IndexUtil.getIndexColumnDataType(true, dataType);
-                    }
+                @Override
+                public PDataType getDataType() {
+                    return dataType;
+                }
 
-                    @Override
-                    public Integer getMaxLength() {
-                        return null;
-                    }
+                @Override
+                public Integer getMaxLength() {
+                    return maxLength;
+                }
 
-                    @Override
-                    public Integer getScale() {
-                        return null;
-                    }
+                @Override
+                public Integer getScale() {
+                    return scale;
+                }
 
-                    @Override
-                    public SortOrder getSortOrder() {
-                        return SortOrder.getDefault();
-                    }
-                    
-                }, true, SortOrder.getDefault());
-            } else { // add field from data row key
-                builder.addField(indexField);
-            }
+                @Override
+                public SortOrder getSortOrder() {
+                    return sortOrder;
+                }
+                
+            }, true, sortOrder);
         }
         return builder.build();
     }
@@ -928,9 +954,18 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             byte[] cq = Bytes.readByteArray(input);
             coveredColumns.add(new ColumnReference(cf,cq));
         }
-        indexTableName = Bytes.readByteArray(input);
-        dataEmptyKeyValueCF = Bytes.readByteArray(input);
+        // Hack to serialize whether the index row key is optimizable
         int len = WritableUtils.readVInt(input);
+        if (len < 0) {
+            rowKeyOrderOptimizable = false;
+            len *= -1;
+        } else {
+            rowKeyOrderOptimizable = true;
+        }
+        indexTableName = new byte[len];
+        input.readFully(indexTableName, 0, len);
+        dataEmptyKeyValueCF = Bytes.readByteArray(input);
+        len = WritableUtils.readVInt(input);
         //TODO remove this in the next major release
         boolean isNewClient = false;
         if (len < 0) {
@@ -1023,7 +1058,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             Bytes.writeByteArray(output, ref.getFamily());
             Bytes.writeByteArray(output, ref.getQualifier());
         }
-        Bytes.writeByteArray(output, indexTableName);
+        // TODO: remove when rowKeyOrderOptimizable hack no longer needed
+        WritableUtils.writeVInt(output,indexTableName.length * (rowKeyOrderOptimizable ? 1 : -1));
+        output.write(indexTableName, 0, indexTableName.length);
         Bytes.writeByteArray(output, dataEmptyKeyValueCF);
         // TODO in order to maintain b/w compatibility encode emptyKeyValueCFPtr.getLength() as a negative value (so we can distinguish between new and old clients)
         // when indexedColumnTypes is removed, remove this 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index cf66d93..a0aefaa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -51,6 +51,8 @@ import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.filter.ColumnProjectionFilter;
@@ -67,7 +69,6 @@ import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTable.ViewType;
-import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
@@ -134,6 +135,68 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         return true;
     }
     
+    private static void initializeScan(QueryPlan plan, Integer perScanLimit) {
+        StatementContext context = plan.getContext();
+        TableRef tableRef = plan.getTableRef();
+        PTable table = tableRef.getTable();
+        Scan scan = context.getScan();
+
+        Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap();
+        // Hack for PHOENIX-2067 to force raw scan over all KeyValues to fix their row keys
+        if (context.getConnection().isDescVarLengthRowKeyUpgrade()) {
+            // We project *all* KeyValues across all column families as we make a pass over
+            // a physical table and we want to make sure we catch all KeyValues that may be
+            // dynamic or part of an updatable view.
+            familyMap.clear();
+            scan.setMaxVersions();
+            scan.setFilter(null); // Remove any filter
+            scan.setRaw(true); // Traverse (and subsequently clone) all KeyValues
+            // Pass over PTable so we can re-write rows according to the row key schema
+            scan.setAttribute(BaseScannerRegionObserver.UPGRADE_DESC_ROW_KEY, UngroupedAggregateRegionObserver.serialize(table));
+        } else {
+            FilterableStatement statement = plan.getStatement();
+            RowProjector projector = plan.getProjector();
+            boolean keyOnlyFilter = familyMap.isEmpty() && context.getWhereCoditionColumns().isEmpty();
+            if (projector.isProjectEmptyKeyValue()) {
+                // If nothing projected into scan and we only have one column family, just allow everything
+                // to be projected and use a FirstKeyOnlyFilter to skip from row to row. This turns out to
+                // be quite a bit faster.
+                // Where condition columns also will get added into familyMap
+                // When where conditions are present, we can not add FirstKeyOnlyFilter at beginning.
+                if (familyMap.isEmpty() && context.getWhereCoditionColumns().isEmpty()
+                        && table.getColumnFamilies().size() == 1) {
+                    // Project the one column family. We must project a column family since it's possible
+                    // that there are other non declared column families that we need to ignore.
+                    scan.addFamily(table.getColumnFamilies().get(0).getName().getBytes());
+                } else {
+                    byte[] ecf = SchemaUtil.getEmptyColumnFamily(table);
+                    // Project empty key value unless the column family containing it has
+                    // been projected in its entirety.
+                    if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) {
+                        scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES);
+                    }
+                }
+            } else if (table.getViewType() == ViewType.MAPPED) {
+                // Since we don't have the empty key value in MAPPED tables, we must select all CFs in HRS. But only the
+                // selected column values are returned back to client
+                for (PColumnFamily family : table.getColumnFamilies()) {
+                    scan.addFamily(family.getName().getBytes());
+                }
+            }
+            // Add FirstKeyOnlyFilter if there are no references to key value columns
+            if (keyOnlyFilter) {
+                ScanUtil.andFilterAtBeginning(scan, new FirstKeyOnlyFilter());
+            }
+            
+            // TODO adding all CFs here is not correct. It should be done only after ColumnProjectionOptimization.
+            if (perScanLimit != null) {
+                ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit));
+            }
+    
+            doColumnProjectionOptimization(context, scan, table, statement);
+        }
+    }
+    
     public BaseResultIterators(QueryPlan plan, Integer perScanLimit, ParallelScanGrouper scanGrouper) throws SQLException {
         super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(), plan.getStatement().getHint(), plan.getLimit());
         this.plan = plan;
@@ -141,52 +204,12 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         StatementContext context = plan.getContext();
         TableRef tableRef = plan.getTableRef();
         PTable table = tableRef.getTable();
-        FilterableStatement statement = plan.getStatement();
-        RowProjector projector = plan.getProjector();
         physicalTableName = table.getPhysicalName().getBytes();
         tableStats = useStats() ? new MetaDataClient(context.getConnection()).getTableStats(table) : PTableStats.EMPTY_STATS;
-        Scan scan = context.getScan();
         // Used to tie all the scans together during logging
         scanId = UUID.randomUUID().toString();
-        Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap();
-        boolean keyOnlyFilter = familyMap.isEmpty() && context.getWhereCoditionColumns().isEmpty();
-        if (projector.isProjectEmptyKeyValue()) {
-            // If nothing projected into scan and we only have one column family, just allow everything
-            // to be projected and use a FirstKeyOnlyFilter to skip from row to row. This turns out to
-            // be quite a bit faster.
-            // Where condition columns also will get added into familyMap
-            // When where conditions are present, we can not add FirstKeyOnlyFilter at beginning.
-            if (familyMap.isEmpty() && context.getWhereCoditionColumns().isEmpty()
-                    && table.getColumnFamilies().size() == 1) {
-                // Project the one column family. We must project a column family since it's possible
-                // that there are other non declared column families that we need to ignore.
-                scan.addFamily(table.getColumnFamilies().get(0).getName().getBytes());
-            } else {
-                byte[] ecf = SchemaUtil.getEmptyColumnFamily(table);
-                // Project empty key value unless the column family containing it has
-                // been projected in its entirety.
-                if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) {
-                    scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES);
-                }
-            }
-        } else if (table.getViewType() == ViewType.MAPPED) {
-            // Since we don't have the empty key value in MAPPED tables, we must select all CFs in HRS. But only the
-            // selected column values are returned back to client
-            for (PColumnFamily family : table.getColumnFamilies()) {
-                scan.addFamily(family.getName().getBytes());
-            }
-        }
-        // Add FirstKeyOnlyFilter if there are no references to key value columns
-        if (keyOnlyFilter) {
-            ScanUtil.andFilterAtBeginning(scan, new FirstKeyOnlyFilter());
-        }
         
-        // TODO adding all CFs here is not correct. It should be done only after ColumnProjectionOptimization.
-        if (perScanLimit != null) {
-            ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit));
-        }
-
-        doColumnProjectionOptimization(context, scan, table, statement);
+        initializeScan(plan, perScanLimit);
         
         this.scans = getParallelScans();
         List<KeyRange> splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION);
@@ -200,7 +223,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         this.allFutures = Lists.newArrayListWithExpectedSize(1);
     }
 
-    private void doColumnProjectionOptimization(StatementContext context, Scan scan, PTable table, FilterableStatement statement) {
+    private static void doColumnProjectionOptimization(StatementContext context, Scan scan, PTable table, FilterableStatement statement) {
         Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
         if (familyMap != null && !familyMap.isEmpty()) {
             // columnsTracker contain cf -> qualifiers which should get returned.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
index 7b7d4dc..d999ecb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
@@ -27,15 +27,17 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.execute.DescVarLengthFastByteComparisons;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SizedUtil;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Ordering;
-import org.apache.phoenix.expression.Expression;
-import org.apache.phoenix.expression.OrderByExpression;
-import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.util.SizedUtil;
 
 /**
  * Result scanner that sorts aggregated rows by columns specified in the ORDER BY clause.
@@ -155,7 +157,12 @@ public class OrderedResultIterator implements PeekingResultIterator {
         Ordering<ResultEntry> ordering = null;
         int pos = 0;
         for (OrderByExpression col : orderByExpressions) {
-            Ordering<ImmutableBytesWritable> o = Ordering.from(new ImmutableBytesWritable.Comparator());
+            Expression e = col.getExpression();
+            Comparator<ImmutableBytesWritable> comparator = 
+                    e.getSortOrder() == SortOrder.DESC && !e.getDataType().isFixedWidth() 
+                    ? buildDescVarLengthComparator() 
+                    : new ImmutableBytesWritable.Comparator();
+            Ordering<ImmutableBytesWritable> o = Ordering.from(comparator);
             if(!col.isAscending()) o = o.reverse();
             o = col.isNullsLast() ? o.nullsLast() : o.nullsFirst();
             Ordering<ResultEntry> entryOrdering = o.onResultOf(new NthKey(pos++));
@@ -164,6 +171,23 @@ public class OrderedResultIterator implements PeekingResultIterator {
         return ordering;
     }
 
+    /*
+     * Same as regular comparator, but if all the bytes match and the length is
+     * different, returns the longer length as bigger.
+     */
+    private static Comparator<ImmutableBytesWritable> buildDescVarLengthComparator() {
+        return new Comparator<ImmutableBytesWritable>() {
+
+            @Override
+            public int compare(ImmutableBytesWritable o1, ImmutableBytesWritable o2) {
+                return DescVarLengthFastByteComparisons.compareTo(
+                        o1.get(), o1.getOffset(), o1.getLength(),
+                        o2.get(), o2.getOffset(), o2.getLength());
+            }
+            
+        };
+    }
+    
     @Override
     public Tuple next() throws SQLException {
         return getResultIterator().next();
@@ -252,13 +276,13 @@ public class OrderedResultIterator implements PeekingResultIterator {
         planSteps.add("CLIENT" + (limit == null ? "" : " TOP " + limit + " ROW"  + (limit == 1 ? "" : "S"))  + " SORTED BY " + orderByExpressions.toString());
     }
 
-	@Override
-	public String toString() {
-		return "OrderedResultIterator [thresholdBytes=" + thresholdBytes
-				+ ", limit=" + limit + ", delegate=" + delegate
-				+ ", orderByExpressions=" + orderByExpressions
-				+ ", estimatedByteSize=" + estimatedByteSize
-				+ ", resultIterator=" + resultIterator + ", byteSize="
-				+ byteSize + "]";
-	}
+    @Override
+    public String toString() {
+        return "OrderedResultIterator [thresholdBytes=" + thresholdBytes
+                + ", limit=" + limit + ", delegate=" + delegate
+                + ", orderByExpressions=" + orderByExpressions
+                + ", estimatedByteSize=" + estimatedByteSize
+                + ", resultIterator=" + resultIterator + ", byteSize="
+                + byteSize + "]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 5805999..0cd76fe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -140,6 +140,9 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
     private Consistency consistency = Consistency.STRONG;
     private Map<String, String> customTracingAnnotations = emptyMap();
     private final boolean isRequestLevelMetricsEnabled;
+    private final boolean isDescVarLengthRowKeyUpgrade;
+    
+    
     static {
         Tracing.addTraceMetricsSource();
     }
@@ -150,28 +153,35 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
         return props;
     }
 
-    public PhoenixConnection(PhoenixConnection connection) throws SQLException {
-        this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.getMetaDataCache());
+    public PhoenixConnection(PhoenixConnection connection, boolean isDescRowKeyOrderUpgrade) throws SQLException {
+        this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.getMetaDataCache(), isDescRowKeyOrderUpgrade);
         this.isAutoCommit = connection.isAutoCommit;
         this.sampler = connection.sampler;
         this.statementExecutionCounter = connection.statementExecutionCounter;
     }
+
+    public PhoenixConnection(PhoenixConnection connection) throws SQLException {
+        this(connection, connection.isDescVarLengthRowKeyUpgrade);
+    }
     
     public PhoenixConnection(PhoenixConnection connection, long scn) throws SQLException {
         this(connection.getQueryServices(), connection, scn);
-        this.sampler = connection.sampler;
-        this.statementExecutionCounter = connection.statementExecutionCounter;
     }
     
     public PhoenixConnection(ConnectionQueryServices services, PhoenixConnection connection, long scn) throws SQLException {
-        this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.getMetaDataCache());
+        this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.getMetaDataCache(), connection.isDescVarLengthRowKeyUpgrade());
         this.isAutoCommit = connection.isAutoCommit;
         this.sampler = connection.sampler;
         this.statementExecutionCounter = connection.statementExecutionCounter;
     }
     
     public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData) throws SQLException {
+        this(services, url, info, metaData, false);
+    }
+
+    public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData, boolean isDescVarLengthRowKeyUpgrade) throws SQLException {
         this.url = url;
+        this.isDescVarLengthRowKeyUpgrade = isDescVarLengthRowKeyUpgrade;
         // Copy so client cannot change
         this.info = info == null ? new Properties() : PropertiesUtil.deepCopy(info);
         final PName tenantId = JDBCUtil.getTenantId(url, info);
@@ -887,4 +897,13 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
             mutationState.getReadMetricQueue().clearMetrics();
         }
     }
+
+    /**
+     * Returns true if this connection is being used to upgrade the
+     * data due to PHOENIX-2067 and false otherwise.
+     * @return
+     */
+    public boolean isDescVarLengthRowKeyUpgrade() {
+        return isDescVarLengthRowKeyUpgrade;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 9cd32e8..feb5989 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -162,6 +162,7 @@ import org.apache.phoenix.util.UpgradeUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Throwables;
 import com.google.common.cache.Cache;
@@ -1831,7 +1832,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
      * This closes the passed connection.
      */
     private PhoenixConnection addColumn(PhoenixConnection oldMetaConnection, String tableName, long timestamp, String columns, boolean addIfNotExists) throws SQLException {
-        Properties props = new Properties(oldMetaConnection.getClientInfo());
+        Properties props = PropertiesUtil.deepCopy(oldMetaConnection.getClientInfo());
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timestamp));
         // Cannot go through DriverManager or you end up in an infinite loop because it'll call init again
         PhoenixConnection metaConnection = new PhoenixConnection(this, oldMetaConnection.getURL(), props, oldMetaConnection.getMetaDataCache());
@@ -1970,6 +1971,20 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                          */
                                         logger.debug("No need to run 4.5 upgrade");
                                     }
+                                    Properties props = PropertiesUtil.deepCopy(metaConnection.getClientInfo());
+                                    props.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB);
+                                    props.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
+                                    PhoenixConnection conn = new PhoenixConnection(ConnectionQueryServicesImpl.this, metaConnection.getURL(), props, metaConnection.getMetaDataCache());
+                                    try {
+                                        Set<String> tablesNeedingUpgrade = UpgradeUtil.getPhysicalTablesWithDescVarLengthRowKey(conn);
+                                        if (!tablesNeedingUpgrade.isEmpty()) {
+                                            logger.warn("The following tables require upgrade due to a bug causing the row key to be incorrect for descending columns (PHOENIX-2067):\n" + Joiner.on(' ').join(tablesNeedingUpgrade));
+                                        }
+                                    } catch (Exception ex) {
+                                        logger.error("Unable to determine tables requiring upgrade due to PHOENIX-2067", ex);
+                                    } finally {
+                                        conn.close();
+                                    }
                                 }
                             }
                             int nSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java b/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java
index afcc741..bca55e8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/KeyRange.java
@@ -17,8 +17,6 @@
  */
 package org.apache.phoenix.query;
 
-import static org.apache.phoenix.query.QueryConstants.SEPARATOR_BYTE_ARRAY;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -394,18 +392,6 @@ public class KeyRange implements Writable {
         return lowerRange == KeyRange.EMPTY_RANGE.getLowerRange() && upperRange == KeyRange.EMPTY_RANGE.getUpperRange();
     }
 
-    public KeyRange appendSeparator() {
-        byte[] lowerBound = getLowerRange();
-        byte[] upperBound = getUpperRange();
-        if (lowerBound != UNBOUND) {
-            lowerBound = ByteUtil.concat(lowerBound, SEPARATOR_BYTE_ARRAY);
-        }
-        if (upperBound != UNBOUND) {
-            upperBound = ByteUtil.concat(upperBound, SEPARATOR_BYTE_ARRAY);
-        }
-        return getKeyRange(lowerBound, lowerInclusive, upperBound, upperInclusive);
-    }
-
     /**
      * @return list of at least size 1
      */

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index d095049..92479b3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -114,6 +114,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.schema.MetaDataSplitPolicy;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
+import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.util.ByteUtil;
 
 
@@ -163,6 +164,8 @@ public interface QueryConstants {
      */
     public static final byte SEPARATOR_BYTE = (byte) 0;
     public static final byte[] SEPARATOR_BYTE_ARRAY = new byte[] {SEPARATOR_BYTE};
+    public static final byte DESC_SEPARATOR_BYTE = SortOrder.invert(SEPARATOR_BYTE);
+    public static final byte[] DESC_SEPARATOR_BYTE_ARRAY = new byte[] {DESC_SEPARATOR_BYTE};
 
     public static final String DEFAULT_COPROCESS_PATH = "phoenix.jar";
     public final static int MILLIS_IN_DAY = 1000 * 60 * 60 * 24;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 2a43679..0251da1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -241,4 +241,9 @@ public class DelegateTable implements PTable {
     public int getBaseColumnCount() {
         return delegate.getBaseColumnCount();
     }
+
+    @Override
+    public boolean rowKeyOrderOptimizable() {
+        return delegate.rowKeyOrderOptimizable();
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 9ad52a5..b1fcf30 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -189,6 +189,7 @@ import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
+import org.apache.phoenix.util.UpgradeUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -702,8 +703,13 @@ public class MetaDataClient {
                     sortOrder = pkSortOrder.getSecond();
                 }
             }
-
             String columnName = columnDefName.getColumnName();
+            if (isPK && sortOrder == SortOrder.DESC && def.getDataType() == PVarbinary.INSTANCE) {
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.DESC_VARBINARY_NOT_SUPPORTED)
+                    .setColumnName(columnName)
+                    .build().buildException();
+            }
+
             PName familyName = null;
             if (def.isPK() && !pkConstraint.getColumnNames().isEmpty() ) {
                 throw new SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_ALREADY_EXISTS)
@@ -1432,6 +1438,8 @@ public class MetaDataClient {
             String parentTableName = null;
             PName tenantId = connection.getTenantId();
             String tenantIdStr = tenantId == null ? null : connection.getTenantId().getString();
+            Long scn = connection.getSCN();
+            long clientTimeStamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
             boolean multiTenant = false;
             boolean storeNulls = false;
             Integer saltBucketNum = null;
@@ -1439,6 +1447,7 @@ public class MetaDataClient {
             boolean isImmutableRows = false;
             List<PName> physicalNames = Collections.emptyList();
             boolean addSaltColumn = false;
+            boolean rowKeyOrderOptimizable = true;
             if (parent != null && tableType == PTableType.INDEX) {
                 // Index on view
                 // TODO: Can we support a multi-tenant index directly on a multi-tenant
@@ -1464,7 +1473,7 @@ public class MetaDataClient {
                 parentTableName = parent.getTableName().getString();
                 // Pass through data table sequence number so we can check it hasn't changed
                 PreparedStatement incrementStatement = connection.prepareStatement(INCREMENT_SEQ_NUM);
-                incrementStatement.setString(1, connection.getTenantId() == null ? null : connection.getTenantId().getString());
+                incrementStatement.setString(1, tenantIdStr);
                 incrementStatement.setString(2, schemaName);
                 incrementStatement.setString(3, parentTableName);
                 incrementStatement.setLong(4, parent.getSequenceNumber());
@@ -1476,7 +1485,7 @@ public class MetaDataClient {
 
                 // Add row linking from data table row to index table row
                 PreparedStatement linkStatement = connection.prepareStatement(CREATE_LINK);
-                linkStatement.setString(1, connection.getTenantId() == null ? null : connection.getTenantId().getString());
+                linkStatement.setString(1, tenantIdStr);
                 linkStatement.setString(2, schemaName);
                 linkStatement.setString(3, parentTableName);
                 linkStatement.setString(4, tableName);
@@ -1589,6 +1598,12 @@ public class MetaDataClient {
                 } else {
                     // Propagate property values to VIEW.
                     // TODO: formalize the known set of these properties
+                    // Manually transfer the ROW_KEY_ORDER_OPTIMIZABLE_BYTES from parent as we don't
+                    // want to add this hacky flag to the schema (see PHOENIX-2067).
+                    rowKeyOrderOptimizable = parent.rowKeyOrderOptimizable();
+                    if (rowKeyOrderOptimizable) {
+                        UpgradeUtil.addRowKeyOrderOptimizableCell(tableMetaData, SchemaUtil.getTableKey(tenantIdStr, schemaName, tableName), clientTimeStamp);
+                    }
                     multiTenant = parent.isMultiTenant();
                     saltBucketNum = parent.getBucketNum();
                     isImmutableRows = parent.isImmutableRows();
@@ -1606,7 +1621,7 @@ public class MetaDataClient {
                     // FIXME: not currently used, but see PHOENIX-1367
                     // as fixing that will require it's usage.
                     PreparedStatement linkStatement = connection.prepareStatement(CREATE_VIEW_LINK);
-                    linkStatement.setString(1, connection.getTenantId() == null ? null : connection.getTenantId().getString());
+                    linkStatement.setString(1, tenantIdStr);
                     linkStatement.setString(2, schemaName);
                     linkStatement.setString(3, tableName);
                     linkStatement.setString(4, parent.getName().getString());
@@ -1629,7 +1644,7 @@ public class MetaDataClient {
                     // Add row linking from data table row to physical table row
                     PreparedStatement linkStatement = connection.prepareStatement(CREATE_LINK);
                     for (PName physicalName : physicalNames) {
-                        linkStatement.setString(1, connection.getTenantId() == null ? null : connection.getTenantId().getString());
+                        linkStatement.setString(1, tenantIdStr);
                         linkStatement.setString(2, schemaName);
                         linkStatement.setString(3, tableName);
                         linkStatement.setString(4, physicalName.getString());
@@ -1788,7 +1803,7 @@ public class MetaDataClient {
                         Collections.<PTable>emptyList(), isImmutableRows,
                         Collections.<PName>emptyList(), defaultFamilyName == null ? null :
                                 PNameFactory.newName(defaultFamilyName), null,
-                        Boolean.TRUE.equals(disableWAL), false, false, null, indexId, indexType);
+                        Boolean.TRUE.equals(disableWAL), false, false, null, indexId, indexType, true);
                 connection.addTable(table);
             } else if (tableType == PTableType.INDEX && indexId == null) {
                 if (tableProps.get(HTableDescriptor.MAX_FILESIZE) == null) {
@@ -1942,7 +1957,7 @@ public class MetaDataClient {
                         PTable.INITIAL_SEQ_NUM, pkName == null ? null : PNameFactory.newName(pkName), saltBucketNum, columns,
                         dataTableName == null ? null : newSchemaName, dataTableName == null ? null : PNameFactory.newName(dataTableName), Collections.<PTable>emptyList(), isImmutableRows,
                         physicalNames, defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), viewStatement, Boolean.TRUE.equals(disableWAL), multiTenant, storeNulls, viewType,
-                        indexId, indexType);
+                        indexId, indexType, rowKeyOrderOptimizable);
                 result = new MetaDataMutationResult(code, result.getMutationTime(), table, true);
                 addTableToCache(result);
                 return table;
@@ -2889,6 +2904,8 @@ public class MetaDataClient {
             if (code == MutationCode.TABLE_ALREADY_EXISTS) {
                 if (result.getTable() != null) { // To accommodate connection-less update of index state
                     addTableToCache(result);
+                    // Set so that we get the table below with the potentially modified rowKeyOrderOptimizable flag set
+                    indexRef.setTable(result.getTable());
                 }
             }
             if (newIndexState == PIndexState.BUILDING) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index b983074..8da2206 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -323,4 +323,13 @@ public interface PTable extends PMetaDataEntity {
     IndexType getIndexType();
     PTableStats getTableStats();
     int getBaseColumnCount();
+
+    /**
+     * Determines whether or not we may optimize out an ORDER BY or do a GROUP BY
+     * in-place when the optimizer tells us it's possible. This is due to PHOENIX-2067
+     * and only applicable for tables using DESC primary key column(s) which have
+     * not been upgraded.
+     * @return true if optimizations row key order optimizations are possible
+     */
+    boolean rowKeyOrderOptimizable();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index a947bfc..1756c2f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.schema;
 
 import static org.apache.phoenix.hbase.index.util.KeyValueBuilder.addQuietly;
 import static org.apache.phoenix.hbase.index.util.KeyValueBuilder.deleteQuietly;
-import static org.apache.phoenix.query.QueryConstants.SEPARATOR_BYTE;
 import static org.apache.phoenix.schema.SaltingUtil.SALTING_COLUMN;
 
 import java.io.IOException;
@@ -126,6 +125,8 @@ public class PTableImpl implements PTable {
     private IndexType indexType;
     private PTableStats tableStats = PTableStats.EMPTY_STATS;
     private int baseColumnCount;
+    private boolean hasDescVarLengthColumns;
+    private boolean rowKeyOrderOptimizable;
 
     public PTableImpl() {
         this.indexes = Collections.emptyList();
@@ -195,7 +196,8 @@ public class PTableImpl implements PTable {
                 table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
                 table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), getColumnsToClone(table), parentSchemaName, table.getParentTableName(),
                 indexes, table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), viewStatement,
-                table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(), table.getBaseColumnCount());
+                table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
+                table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable());
     }
 
     public static PTableImpl makePTable(PTable table, List<PColumn> columns) throws SQLException {
@@ -203,7 +205,8 @@ public class PTableImpl implements PTable {
                 table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), table.getTimeStamp(),
                 table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
                 table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
-                table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(), table.getBaseColumnCount());
+                table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
+                table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable());
     }
 
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns) throws SQLException {
@@ -211,7 +214,8 @@ public class PTableImpl implements PTable {
                 table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
                 sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(),
-                table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(), table.getBaseColumnCount());
+                table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(),
+                table.getBaseColumnCount(), table.rowKeyOrderOptimizable());
     }
 
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows) throws SQLException {
@@ -219,7 +223,8 @@ public class PTableImpl implements PTable {
                 table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
                 sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
                 table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
-                table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(), table.getBaseColumnCount());
+                table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(),
+                table.getIndexType(), table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable());
     }
     
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, List<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls) throws SQLException {
@@ -227,7 +232,8 @@ public class PTableImpl implements PTable {
                 table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
                 sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
                 table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
-                isWalDisabled, isMultitenant, storeNulls, table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(), table.getBaseColumnCount());
+                isWalDisabled, isMultitenant, storeNulls, table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(),
+                table.getBaseColumnCount(), table.rowKeyOrderOptimizable());
     }
     
     public static PTableImpl makePTable(PTable table, PIndexState state) throws SQLException {
@@ -236,7 +242,18 @@ public class PTableImpl implements PTable {
                 table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), getColumnsToClone(table),
                 table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
-                table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(), table.getBaseColumnCount());
+                table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
+                table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable());
+    }
+
+    public static PTableImpl makePTable(PTable table, boolean rowKeyOrderOptimizable) throws SQLException {
+        return new PTableImpl(
+                table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), table.getTimeStamp(),
+                table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), getColumnsToClone(table),
+                table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
+                table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
+                table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), table.getTableStats(),
+                table.getBaseColumnCount(), rowKeyOrderOptimizable);
     }
 
     public static PTableImpl makePTable(PTable table, PTableStats stats) throws SQLException {
@@ -245,28 +262,29 @@ public class PTableImpl implements PTable {
                 table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), getColumnsToClone(table),
                 table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
-                table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), stats, table.getBaseColumnCount());
+                table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), stats,
+                table.getBaseColumnCount(), table.rowKeyOrderOptimizable());
     }
 
     public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state, long timeStamp, long sequenceNumber,
             PName pkName, Integer bucketNum, List<PColumn> columns, PName dataSchemaName, PName dataTableName, List<PTable> indexes,
             boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant,
-            boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType) throws SQLException {
+            boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType, boolean rowKeyOrderOptimizable) throws SQLException {
         return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataSchemaName,
                 dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
                 viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId,
-                indexType, PTableStats.EMPTY_STATS, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT);
+                indexType, PTableStats.EMPTY_STATS, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, rowKeyOrderOptimizable);
     }
 
     public static PTableImpl makePTable(PName tenantId, PName schemaName, PName tableName, PTableType type,
             PIndexState state, long timeStamp, long sequenceNumber, PName pkName, Integer bucketNum,
             List<PColumn> columns, PName dataSchemaName, PName dataTableName, List<PTable> indexes,
             boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression,
-            boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType, @NotNull PTableStats stats, int baseColumnCount)
+            boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType, @NotNull PTableStats stats, int baseColumnCount, boolean rowKeyOrderOptimizable)
             throws SQLException {
         return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName,
                 bucketNum, columns, dataSchemaName, dataTableName, indexes, isImmutableRows, physicalNames,
-                defaultFamilyName, viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, stats, baseColumnCount);
+                defaultFamilyName, viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, stats, baseColumnCount, rowKeyOrderOptimizable);
     }
 
     private PTableImpl(PName tenantId, PName schemaName, PName tableName, PTableType type, PIndexState state,
@@ -274,10 +292,10 @@ public class PTableImpl implements PTable {
             PName parentSchemaName, PName parentTableName, List<PTable> indexes, boolean isImmutableRows,
             List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant,
             boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType,
-            PTableStats stats, int baseColumnCount) throws SQLException {
+            PTableStats stats, int baseColumnCount, boolean rowKeyOrderOptimizable) throws SQLException {
         init(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns,
                 stats, schemaName, parentTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
-                viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount);
+                viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable);
     }
 
     @Override
@@ -305,7 +323,7 @@ public class PTableImpl implements PTable {
             PName pkName, Integer bucketNum, List<PColumn> columns, PTableStats stats, PName parentSchemaName, PName parentTableName,
             List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL,
             boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
-            IndexType indexType , int baseColumnCount) throws SQLException {
+            IndexType indexType , int baseColumnCount, boolean rowKeyOrderOptimizable) throws SQLException {
         Preconditions.checkNotNull(schemaName);
         Preconditions.checkArgument(tenantId==null || tenantId.getBytes().length > 0); // tenantId should be null or not empty
         int estimatedSize = SizedUtil.OBJECT_SIZE * 2 + 23 * SizedUtil.POINTER_SIZE + 4 * SizedUtil.INT_SIZE + 2 * SizedUtil.LONG_SIZE + 2 * SizedUtil.INT_OBJECT_SIZE +
@@ -335,6 +353,7 @@ public class PTableImpl implements PTable {
         this.viewIndexId = viewIndexId;
         this.indexType = indexType;
         this.tableStats = stats;
+        this.rowKeyOrderOptimizable = rowKeyOrderOptimizable;
         List<PColumn> pkColumns;
         PColumn[] allColumns;
         
@@ -386,7 +405,8 @@ public class PTableImpl implements PTable {
         for (PColumn column : allColumns) {
             PName familyName = column.getFamilyName();
             if (familyName == null) {
-            	 pkColumns.add(column);
+                hasDescVarLengthColumns |= (column.getSortOrder() == SortOrder.DESC && !column.getDataType().isFixedWidth());
+            	pkColumns.add(column);
             }
             if (familyName == null) {
                 estimatedSize += column.getEstimatedSize(); // PK columns
@@ -401,6 +421,7 @@ public class PTableImpl implements PTable {
             }
         }
         this.pkColumns = ImmutableList.copyOf(pkColumns);
+        builder.rowKeyOrderOptimizable(this.rowKeyOrderOptimizable()); // after hasDescVarLengthColumns is calculated
         this.rowKeySchema = builder.build();
         estimatedSize += rowKeySchema.getEstimatedSize();
         Iterator<Map.Entry<PName,List<PColumn>>> iterator = familyMap.entrySet().iterator();
@@ -500,18 +521,22 @@ public class PTableImpl implements PTable {
             List<PColumn> columns = getPKColumns();
             int nColumns = columns.size();
             PDataType type = null;
+            SortOrder sortOrder = null;
+            boolean wasNull = false;
             while (i < nValues && i < nColumns) {
                 // Separate variable length column values in key with zero byte
                 if (type != null && !type.isFixedWidth()) {
-                    os.write(SEPARATOR_BYTE);
+                    os.write(SchemaUtil.getSeparatorByte(rowKeyOrderOptimizable(), wasNull, sortOrder));
                 }
                 PColumn column = columns.get(i);
+                sortOrder = column.getSortOrder();
                 type = column.getDataType();
                 // This will throw if the value is null and the type doesn't allow null
                 byte[] byteValue = values[i++];
                 if (byteValue == null) {
                     byteValue = ByteUtil.EMPTY_BYTE_ARRAY;
                 }
+                wasNull = byteValue.length == 0;
                 // An empty byte array return value means null. Do this,
                 // since a type may have muliple representations of null.
                 // For example, VARCHAR treats both null and an empty string
@@ -529,11 +554,14 @@ public class PTableImpl implements PTable {
                 }
                 os.write(byteValue, 0, byteValue.length);
             }
+            // Need trailing byte for DESC columns
+            if (type != null && !type.isFixedWidth() && SchemaUtil.getSeparatorByte(rowKeyOrderOptimizable(), wasNull, sortOrder) == QueryConstants.DESC_SEPARATOR_BYTE) {
+                os.write(QueryConstants.DESC_SEPARATOR_BYTE);
+            }
             // If some non null pk values aren't set, then throw
             if (i < nColumns) {
                 PColumn column = columns.get(i);
-                type = column.getDataType();
-                if (type.isFixedWidth() || !column.isNullable()) {
+                if (column.getDataType().isFixedWidth() || !column.isNullable()) {
                     throw new ConstraintViolationException(name.getString() + "." + column.getName().getString() + " may not be null");
                 }
             }
@@ -977,12 +1005,16 @@ public class PTableImpl implements PTable {
           baseColumnCount = table.getBaseColumnCount();
       }
 
+      boolean rowKeyOrderOptimizable = false;
+      if (table.hasRowKeyOrderOptimizable()) {
+          rowKeyOrderOptimizable = table.getRowKeyOrderOptimizable();
+      }
       try {
         PTableImpl result = new PTableImpl();
         result.init(tenantId, schemaName, tableName, tableType, indexState, timeStamp, sequenceNumber, pkName,
           (bucketNum == NO_SALTING) ? null : bucketNum, columns, stats, schemaName,dataTableName, indexes,
               isImmutableRows, physicalNames, defaultFamilyName, viewStatement, disableWAL,
-                multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount);
+                multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable);
         return result;
       } catch (SQLException e) {
         throw new RuntimeException(e); // Impossible
@@ -1072,6 +1104,7 @@ public class PTableImpl implements PTable {
         }
       }
       builder.setBaseColumnCount(table.getBaseColumnCount());
+      builder.setRowKeyOrderOptimizable(table.rowKeyOrderOptimizable());
 
       return builder.build();
     }
@@ -1095,4 +1128,9 @@ public class PTableImpl implements PTable {
     public int getBaseColumnCount() {
         return baseColumnCount;
     }
+
+    @Override
+    public boolean rowKeyOrderOptimizable() {
+        return rowKeyOrderOptimizable || !hasDescVarLengthColumns;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/schema/RowKeySchema.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/RowKeySchema.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/RowKeySchema.java
index e5aa571..9d86dd6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/RowKeySchema.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/RowKeySchema.java
@@ -17,13 +17,12 @@
  */
 package org.apache.phoenix.schema;
 
-import static org.apache.phoenix.query.QueryConstants.SEPARATOR_BYTE;
-
 import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.util.SchemaUtil;
 
 
 /**
@@ -37,17 +36,19 @@ import org.apache.phoenix.query.QueryConstants;
  * @since 0.1
  */
 public class RowKeySchema extends ValueSchema {
-    public static final RowKeySchema EMPTY_SCHEMA = new RowKeySchema(0,Collections.<Field>emptyList())
+    public static final RowKeySchema EMPTY_SCHEMA = new RowKeySchema(0,Collections.<Field>emptyList(), true)
     ;
     
     public RowKeySchema() {
     }
     
-    protected RowKeySchema(int minNullable, List<Field> fields) {
-        super(minNullable, fields);
+    protected RowKeySchema(int minNullable, List<Field> fields, boolean rowKeyOrderOptimizable) {
+        super(minNullable, fields, rowKeyOrderOptimizable);
     }
 
     public static class RowKeySchemaBuilder extends ValueSchemaBuilder {
+        private boolean rowKeyOrderOptimizable = false;
+        
         public RowKeySchemaBuilder(int maxFields) {
             super(maxFields);
             setMaxFields(maxFields);
@@ -59,13 +60,22 @@ public class RowKeySchema extends ValueSchema {
             return this;
         }
 
+        public RowKeySchemaBuilder rowKeyOrderOptimizable(boolean rowKeyOrderOptimizable) {
+            this.rowKeyOrderOptimizable = rowKeyOrderOptimizable;
+            return this;
+        }
+
         @Override
         public RowKeySchema build() {
             List<Field> condensedFields = buildFields();
-            return new RowKeySchema(this.minNullable, condensedFields);
+            return new RowKeySchema(this.minNullable, condensedFields, rowKeyOrderOptimizable);
         }
     }
 
+    public boolean rowKeyOrderOptimizable() {
+        return rowKeyOrderOptimizable;
+    }
+
     public int getMaxFields() {
         return this.getMinNullable();
     }
@@ -148,13 +158,19 @@ public class RowKeySchema extends ValueSchema {
         if (field.getDataType().isFixedWidth()) {
             ptr.set(ptr.get(),ptr.getOffset(), field.getByteSize());
         } else {
-            if (position+1 == getFieldCount() ) { // Last field has no terminator
-                ptr.set(ptr.get(), ptr.getOffset(), maxOffset - ptr.getOffset());
+            if (position+1 == getFieldCount() ) {
+                // Last field has no terminator unless it's descending sort order
+                int len = maxOffset - ptr.getOffset();
+                ptr.set(ptr.get(), ptr.getOffset(), maxOffset - ptr.getOffset() - (SchemaUtil.getSeparatorByte(rowKeyOrderOptimizable, len == 0, field) == QueryConstants.DESC_SEPARATOR_BYTE ? 1 : 0));
             } else {
                 byte[] buf = ptr.get();
                 int offset = ptr.getOffset();
-                while (offset < maxOffset && buf[offset] != SEPARATOR_BYTE) {
-                    offset++;
+                // First byte 
+                if (offset < maxOffset && buf[offset] != QueryConstants.SEPARATOR_BYTE) {
+                    byte sepByte = SchemaUtil.getSeparatorByte(rowKeyOrderOptimizable, false, field);
+                    do {
+                        offset++;
+                    } while (offset < maxOffset && buf[offset] != sepByte);
                 }
                 ptr.set(buf, ptr.getOffset(), offset - ptr.getOffset());
             }
@@ -204,8 +220,12 @@ public class RowKeySchema extends ValueSchema {
         if (!field.getDataType().isFixedWidth()) {
             byte[] buf = ptr.get();
             int offset = ptr.getOffset()-1-offsetAdjustment;
-            while (offset > minOffset /* sanity check*/ && buf[offset] != QueryConstants.SEPARATOR_BYTE) {
-                offset--;
+            // Separator always zero byte if zero length
+            if (offset > minOffset && buf[offset] != QueryConstants.SEPARATOR_BYTE) {
+                byte sepByte = SchemaUtil.getSeparatorByte(rowKeyOrderOptimizable, false, field);
+                do {
+                    offset--;
+                } while (offset > minOffset && buf[offset] != sepByte);
             }
             if (offset == minOffset) { // shouldn't happen
                 ptr.set(buf, minOffset, ptr.getOffset()-minOffset-1);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/schema/RowKeyValueAccessor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/RowKeyValueAccessor.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/RowKeyValueAccessor.java
index 03b2e6a..2a59e01 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/RowKeyValueAccessor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/RowKeyValueAccessor.java
@@ -17,8 +17,6 @@
  */
 package org.apache.phoenix.schema;
 
-import static org.apache.phoenix.query.QueryConstants.SEPARATOR_BYTE;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -28,6 +26,7 @@ import java.util.List;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.util.ByteUtil;
 
 
@@ -150,6 +149,9 @@ public class RowKeyValueAccessor implements Writable   {
         ByteUtil.serializeVIntArray(output, offsets, length);
     }
     
+    private static boolean isSeparatorByte(byte b) {
+        return b == QueryConstants.SEPARATOR_BYTE || b == QueryConstants.DESC_SEPARATOR_BYTE;
+    }
     /**
      * Calculate the byte offset in the row key to the start of the PK column value
      * @param keyBuffer the byte array of the row key
@@ -164,7 +166,7 @@ public class RowKeyValueAccessor implements Writable   {
             } else { // Else, a negative offset is the number of variable length values to skip
                 while (offset++ < 0) {
                     // FIXME: keyOffset < keyBuffer.length required because HBase passes bogus keys to filter to position scan (HBASE-6562)
-                    while (keyOffset < keyBuffer.length && keyBuffer[keyOffset++] != SEPARATOR_BYTE) {
+                    while (keyOffset < keyBuffer.length && !isSeparatorByte(keyBuffer[keyOffset++])) {
                     }
                 }
             }
@@ -181,11 +183,11 @@ public class RowKeyValueAccessor implements Writable   {
      */
     public int getLength(byte[] keyBuffer, int keyOffset, int maxOffset) {
         if (!hasSeparator) {
-            return maxOffset - keyOffset;
+            return maxOffset - keyOffset - (keyBuffer[maxOffset-1] == QueryConstants.DESC_SEPARATOR_BYTE ? 1 : 0);
         }
         int offset = keyOffset;
         // FIXME: offset < maxOffset required because HBase passes bogus keys to filter to position scan (HBASE-6562)
-        while (offset < maxOffset && keyBuffer[offset] != SEPARATOR_BYTE) {
+        while (offset < maxOffset && !isSeparatorByte(keyBuffer[offset])) {
             offset++;
         }
         return offset - keyOffset;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/schema/ValueSchema.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ValueSchema.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ValueSchema.java
index 7660ffe..a4b40f3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ValueSchema.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ValueSchema.java
@@ -49,12 +49,20 @@ public abstract class ValueSchema implements Writable {
     private boolean isFixedLength;
     private boolean isMaxLength;
     private int minNullable;
+    // Only applicable for RowKeySchema (and only due to PHOENIX-2067), but
+    // added here as this is where serialization is done (and we need to
+    // maintain the same serialization shape for b/w compat).
+    protected boolean rowKeyOrderOptimizable;
     
     public ValueSchema() {
     }
     
     protected ValueSchema(int minNullable, List<Field> fields) {
-        init(minNullable, fields);
+        this(minNullable, fields, true);
+    }
+    
+    protected ValueSchema(int minNullable, List<Field> fields, boolean rowKeyOrderOptimizable) {
+        init(minNullable, fields, rowKeyOrderOptimizable);
     }
     
     @Override
@@ -68,7 +76,8 @@ public abstract class ValueSchema implements Writable {
                 SizedUtil.ARRAY_SIZE + count * Field.ESTIMATED_SIZE + SizedUtil.sizeOfArrayList(count);
     }
 
-    private void init(int minNullable, List<Field> fields) {
+    private void init(int minNullable, List<Field> fields, boolean rowKeyOrderOptimizable) {
+        this.rowKeyOrderOptimizable = rowKeyOrderOptimizable;
         this.minNullable = minNullable;
         this.fields = ImmutableList.copyOf(fields);
         int estimatedLength = 0;
@@ -324,14 +333,6 @@ public abstract class ValueSchema implements Writable {
         return size;
     }
     
-    public void serialize(DataOutput output) throws IOException {
-        WritableUtils.writeVInt(output, minNullable);
-        WritableUtils.writeVInt(output, fields.size());
-        for (int i = 0; i < fields.size(); i++) {
-            fields.get(i).write(output);
-        }
-    }
-    
     public Field getField(int position) {
         return fields.get(fieldIndexByPosition[position]);
     }
@@ -366,19 +367,24 @@ public abstract class ValueSchema implements Writable {
     public void readFields(DataInput in) throws IOException {
         int minNullable = WritableUtils.readVInt(in);
         int nFields = WritableUtils.readVInt(in);
+        boolean rowKeyOrderOptimizable = false;
+        if (nFields < 0) {
+            rowKeyOrderOptimizable = true;
+            nFields *= -1;
+        }
         List<Field> fields = Lists.newArrayListWithExpectedSize(nFields);
         for (int i = 0; i < nFields; i++) {
             Field field = new Field();
             field.readFields(in);
             fields.add(field);
         }
-        init(minNullable, fields);
+        init(minNullable, fields, rowKeyOrderOptimizable);
     }
          
     @Override
     public void write(DataOutput out) throws IOException {
         WritableUtils.writeVInt(out, minNullable);
-        WritableUtils.writeVInt(out, fields.size());
+        WritableUtils.writeVInt(out, fields.size() * (rowKeyOrderOptimizable ? -1 : 1));
         for (int i = 0; i < fields.size(); i++) {
             fields.get(i).write(out);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2620a80c/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
index ebb7d1f..e692470 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
@@ -60,10 +60,10 @@ public class StatisticsUtil {
         int offset = 0;
         System.arraycopy(table, 0, rowKey, offset, table.length);
         offset += table.length;
-        rowKey[offset++] = QueryConstants.SEPARATOR_BYTE;
+        rowKey[offset++] = QueryConstants.SEPARATOR_BYTE; // assumes stats table columns not DESC
         System.arraycopy(fam.get(), fam.getOffset(), rowKey, offset, fam.getLength());
         offset += fam.getLength();
-        rowKey[offset++] = QueryConstants.SEPARATOR_BYTE;
+        rowKey[offset++] = QueryConstants.SEPARATOR_BYTE; // assumes stats table columns not DESC
         System.arraycopy(region, 0, rowKey, offset, region.length);
         return rowKey;
     }