You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/04/13 01:40:37 UTC

phoenix git commit: PHOENIX-1841 Implement PhoenixSort in Phoenix/Calcite Integration

Repository: phoenix
Updated Branches:
  refs/heads/calcite f04eaf172 -> ae0c234fa


PHOENIX-1841 Implement PhoenixSort in Phoenix/Calcite Integration


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ae0c234f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ae0c234f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ae0c234f

Branch: refs/heads/calcite
Commit: ae0c234faf9bd4edac23a7511790104cf86c097e
Parents: f04eaf1
Author: maryannxue <we...@intel.com>
Authored: Sun Apr 12 19:40:22 2015 -0400
Committer: maryannxue <we...@intel.com>
Committed: Sun Apr 12 19:40:22 2015 -0400

----------------------------------------------------------------------
 .../org/apache/phoenix/calcite/CalciteTest.java |  86 +++++++++++++-
 .../apache/phoenix/calcite/CalciteUtils.java    |  10 ++
 .../phoenix/calcite/PhoenixAggregate.java       |  15 +--
 .../apache/phoenix/calcite/PhoenixFilter.java   |   3 -
 .../org/apache/phoenix/calcite/PhoenixJoin.java |  12 +-
 .../apache/phoenix/calcite/PhoenixProject.java  |  13 +--
 .../apache/phoenix/calcite/PhoenixRules.java    |   2 +-
 .../org/apache/phoenix/calcite/PhoenixSort.java | 113 ++++++++++++++++++-
 .../apache/phoenix/compile/OrderByCompiler.java |   2 +-
 .../apache/phoenix/execute/AggregatePlan.java   |   4 +
 .../org/apache/phoenix/execute/ScanPlan.java    |   4 +
 11 files changed, 230 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
index 8d2be0d..3f39768 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
@@ -345,7 +345,91 @@ public class CalciteTest extends BaseClientManagedTimeIT {
                           {"a"}, 
                           {"b"}, 
                           {"c"}})
-                .close();;
+                .close();
+    }
+    
+    @Test public void testSort() {
+        start().sql("select organization_id, entity_id, a_string from aTable order by a_string, entity_id")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixSort(sort0=[$2], sort1=[$1], dir0=[ASC], dir1=[ASC])\n" +
+                           "    PhoenixProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[$2])\n" +
+                           "      PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
+                .resultIs(new Object[][] {
+                          {"00D300000000XHP", "00A123122312312", "a"}, 
+                          {"00D300000000XHP", "00A223122312312", "a"}, 
+                          {"00D300000000XHP", "00A323122312312", "a"}, 
+                          {"00D300000000XHP", "00A423122312312", "a"}, 
+                          {"00D300000000XHP", "00B523122312312", "b"}, 
+                          {"00D300000000XHP", "00B623122312312", "b"}, 
+                          {"00D300000000XHP", "00B723122312312", "b"}, 
+                          {"00D300000000XHP", "00B823122312312", "b"}, 
+                          {"00D300000000XHP", "00C923122312312", "c"}})
+                .close();
+        
+        start().sql("select organization_id, entity_id, a_string from aTable order by organization_id, entity_id")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC])\n" +
+                           "    PhoenixProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[$2])\n" +
+                           "      PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
+                .resultIs(new Object[][] {
+                          {"00D300000000XHP", "00A123122312312", "a"}, 
+                          {"00D300000000XHP", "00A223122312312", "a"}, 
+                          {"00D300000000XHP", "00A323122312312", "a"}, 
+                          {"00D300000000XHP", "00A423122312312", "a"}, 
+                          {"00D300000000XHP", "00B523122312312", "b"}, 
+                          {"00D300000000XHP", "00B623122312312", "b"}, 
+                          {"00D300000000XHP", "00B723122312312", "b"}, 
+                          {"00D300000000XHP", "00B823122312312", "b"}, 
+                          {"00D300000000XHP", "00C923122312312", "c"}})
+                .close();
+        
+        start().sql("select count(entity_id), a_string from atable group by a_string order by count(entity_id), a_string desc")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixProject(EXPR$0=[$1], A_STRING=[$0])\n" +
+                           "    PhoenixSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[DESC])\n" +
+                           "      PhoenixAggregate(group=[{0}], EXPR$0=[COUNT()])\n" +
+                           "        PhoenixProject(A_STRING=[$2])\n" +
+                           "          PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
+                .resultIs(new Object[][] {
+                          {1L, "c"},
+                          {4L, "b"},
+                          {4L, "a"}})
+                .close();
+        
+        start().sql("select s.name, count(\"item_id\") from " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s join " + JOIN_ITEM_TABLE_FULL_NAME + " i on s.\"supplier_id\" = i.\"supplier_id\" group by s.name order by count(\"item_id\"), s.name desc")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[DESC])\n" +
+                           "    PhoenixAggregate(group=[{0}], EXPR$1=[COUNT()])\n" +
+                           "      PhoenixProject(NAME=[$1])\n" +
+                           "        PhoenixJoin(condition=[=($0, $2)], joinType=[inner])\n" +
+                           "          PhoenixProject(supplier_id=[$0], NAME=[$1])\n" +
+                           "            PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]])\n" +
+                           "          PhoenixProject(supplier_id=[$5])\n" +
+                           "            PhoenixTableScan(table=[[phoenix, ITEMTABLE]])\n")
+                .resultIs(new Object[][] {
+                          {"S6", 1L},
+                          {"S5", 1L},
+                          {"S2", 2L},
+                          {"S1", 2L}})
+                .close();
+        
+        start().sql("SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" order by item.name desc")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixSort(sort0=[$1], dir0=[DESC])\n" +
+                           "    PhoenixProject(item_id=[$0], NAME=[$1], supplier_id=[$3], NAME0=[$4])\n" +
+                           "      PhoenixJoin(condition=[=($2, $3)], joinType=[inner])\n" +
+                           "        PhoenixProject(item_id=[$0], NAME=[$1], supplier_id=[$5])\n" +
+                           "          PhoenixTableScan(table=[[phoenix, ITEMTABLE]])\n" +
+                           "        PhoenixProject(supplier_id=[$0], NAME=[$1])\n" +
+                           "          PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]])\n")
+                .resultIs(new Object[][] {
+                          {"0000000006", "T6", "0000000006", "S6"}, 
+                          {"0000000005", "T5", "0000000005", "S5"}, 
+                          {"0000000004", "T4", "0000000002", "S2"}, 
+                          {"0000000003", "T3", "0000000002", "S2"},
+                          {"0000000002", "T2", "0000000001", "S1"},
+                          {"0000000001", "T1", "0000000001", "S1"}})
+                .close();
     }
     
     @Test public void testSubquery() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
index 4110b5e..d2a4a31 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
@@ -5,6 +5,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexLiteral;
@@ -15,6 +17,7 @@ import org.apache.calcite.sql.SqlKind;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.calcite.PhoenixRel.Implementor;
+import org.apache.phoenix.calcite.PhoenixRel.PlanType;
 import org.apache.phoenix.expression.ComparisonExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ExpressionType;
@@ -39,6 +42,13 @@ public class CalciteUtils {
     public static String createTempAlias() {
         return "$" + tempAliasCounter.incrementAndGet();
     }
+    
+    public static RelNode getBestRel(RelNode rel) {
+        if (rel instanceof RelSubset)
+            return ((RelSubset) rel).getBest();
+        
+        return rel;
+    }
 
 	private static final Map<SqlKind, ExpressionFactory> EXPRESSION_MAP = Maps
 			.newHashMapWithExpectedSize(ExpressionType.values().length);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java
index c3d4982..ca3a34c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java
@@ -7,7 +7,6 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.plan.volcano.RelSubset;
 import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
@@ -103,8 +102,9 @@ public class PhoenixAggregate extends Aggregate implements PhoenixRel {
                 basePlan = (ScanPlan) delegate;
             }
         }
-        // TopN, we can not merge with the base plan.
-        if (!plan.getOrderBy().getOrderByExpressions().isEmpty() && plan.getLimit() != null) {
+        // We can not merge with the base plan that has a limit already.
+        // But if there is order-by without a limit, we can simply ignore the order-by.
+        if (plan.getLimit() != null) {
             basePlan = null;
         }
         PhoenixStatement stmt = plan.getContext().getStatement();
@@ -183,12 +183,9 @@ public class PhoenixAggregate extends Aggregate implements PhoenixRel {
     }
     
     private boolean isServerAggregateDoable() {
-        RelNode rel = getInput();
-        if (rel instanceof RelSubset) {
-            rel = ((RelSubset) rel).getBest();
-        }
-        
-        return rel instanceof PhoenixRel && ((PhoenixRel) rel).getPlanType() != PlanType.CLIENT_SERVER;
+        RelNode rel = CalciteUtils.getBestRel(getInput());
+        return rel instanceof PhoenixRel 
+                && ((PhoenixRel) rel).getPlanType() != PlanType.CLIENT_SERVER;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilter.java
index 8925ead..0a49477 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilter.java
@@ -34,9 +34,6 @@ public class PhoenixFilter extends Filter implements PhoenixRel {
     public QueryPlan implement(Implementor implementor) {
         assert getConvention() == getInput().getConvention();
         QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
-        // TODO: what to do with the Expression?
-        // Already determined this filter cannot be pushed down, so
-        // this will be run 
         Expression expr = CalciteUtils.toExpression(condition, implementor);
         return new ClientScanPlan(plan.getContext(), plan.getStatement(), plan.getTableRef(),
                 plan.getProjector(), null, expr, OrderBy.EMPTY_ORDER_BY, plan);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java
index c316b5d..0b85217 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java
@@ -9,7 +9,6 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.plan.volcano.RelSubset;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinInfo;
@@ -125,11 +124,10 @@ public class PhoenixJoin extends Join implements PhoenixRel {
     
     private boolean isHashJoinDoable() {
         // TODO check memory limit
-        RelNode rel = getLeft();
-        if (rel instanceof RelSubset) {
-            rel = ((RelSubset) rel).getBest();
-        }
-        return (rel instanceof PhoenixRel && ((PhoenixRel) rel).getPlanType() == PlanType.SERVER_ONLY_FLAT) && getJoinType() != JoinRelType.RIGHT;
+        RelNode rel = CalciteUtils.getBestRel(getLeft());
+        return rel instanceof PhoenixRel
+                && ((PhoenixRel) rel).getPlanType() == PlanType.SERVER_ONLY_FLAT 
+                && getJoinType() != JoinRelType.RIGHT;
     }
     
     private JoinType convertJoinType(JoinRelType type) {
@@ -155,6 +153,6 @@ public class PhoenixJoin extends Join implements PhoenixRel {
 
     @Override
     public PlanType getPlanType() {
-        return PlanType.SERVER_ONLY_COMPLEX;
+        return isHashJoinDoable() ? PlanType.SERVER_ONLY_COMPLEX : PlanType.CLIENT_SERVER;
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
index 4f08968..53793d7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
@@ -6,13 +6,11 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.plan.volcano.RelSubset;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexNode;
 import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.execute.ScanPlan;
 import org.apache.phoenix.execute.TupleProjectionPlan;
 import org.apache.phoenix.execute.TupleProjector;
@@ -71,13 +69,8 @@ public class PhoenixProject extends Project implements PhoenixRel {
 
     @Override
     public PlanType getPlanType() {
-        RelNode rel = getInput();
-        if (rel instanceof RelSubset) {
-            rel = ((RelSubset) rel).getBest();
-        }
-        // TODO this is based on the assumption that there is no two Project 
-        // in a row and Project can be pushed down to the input node if it is 
-        // a server plan.
-        return !(rel instanceof PhoenixRel) ? PlanType.CLIENT_SERVER : ((PhoenixRel) rel).getPlanType();
+        RelNode rel = CalciteUtils.getBestRel(getInput());
+        return rel instanceof PhoenixRel && !(rel instanceof PhoenixProject) ?
+                    ((PhoenixRel) rel).getPlanType() : PlanType.CLIENT_SERVER;
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java
index e98ee73..843e6de 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java
@@ -68,7 +68,7 @@ public class PhoenixRules {
                 sort.getTraitSet().replace(out)
                     .replace(sort.getCollation());
             return new PhoenixSort(rel.getCluster(), traitSet,
-                convert(sort.getInput(), sort.getInput().getTraitSet().replace(RelCollationImpl.EMPTY)),
+                convert(sort.getInput(), sort.getInput().getTraitSet().replace(out)),
                 sort.getCollation(), sort.offset, sort.fetch);
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java
index 6d11231..978e264 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java
@@ -1,12 +1,39 @@
 package org.apache.phoenix.calcite;
 
+import java.sql.SQLException;
+import java.util.List;
+
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rex.RexNode;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.SequenceManager;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.BaseQueryPlan;
+import org.apache.phoenix.execute.ClientScanPlan;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableRef;
+
+import com.google.common.collect.Lists;
 
 /**
  * Implementation of {@link org.apache.calcite.rel.core.Sort}
@@ -15,6 +42,8 @@ import org.apache.phoenix.compile.QueryPlan;
  * <p>Like {@code Sort}, it also supports LIMIT and OFFSET.
  */
 public class PhoenixSort extends Sort implements PhoenixRel {
+    private static final double CLIENT_MERGE_FACTOR = 0.5;
+    
     public PhoenixSort(RelOptCluster cluster, RelTraitSet traits, RelNode child, RelCollation collation, RexNode offset, RexNode fetch) {
         super(cluster, traits, child, collation, offset, fetch);
     }
@@ -23,11 +52,91 @@ public class PhoenixSort extends Sort implements PhoenixRel {
     public PhoenixSort copy(RelTraitSet traitSet, RelNode newInput, RelCollation newCollation, RexNode offset, RexNode fetch) {
         return new PhoenixSort(getCluster(), traitSet, newInput, newCollation, offset, fetch);
     }
+    
+    @Override
+    public RelOptCost computeSelfCost(RelOptPlanner planner) {
+        RelOptCost cost = super.computeSelfCost(planner);
+        if (isServerSortDoable()) {
+            cost = cost.multiplyBy(SERVER_FACTOR);
+        } else if (isClientSortMergable()) {
+            cost = cost.multiplyBy(CLIENT_MERGE_FACTOR);
+        }
+        return cost.multiplyBy(PHOENIX_FACTOR);
+    }
 
     @Override
     public QueryPlan implement(Implementor implementor) {
-        implementor.visitInput(0, (PhoenixRel) getInput());
-        throw new UnsupportedOperationException();
+        assert getConvention() == getInput().getConvention();
+        if (this.fetch != null || this.offset != null)
+            throw new UnsupportedOperationException();
+            
+        QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+        TableRef tableRef = implementor.getTableRef();
+        BaseQueryPlan basePlan = null;
+        if (plan instanceof BaseQueryPlan) {
+            basePlan = (BaseQueryPlan) plan;
+        } else if (plan instanceof HashJoinPlan) {
+            QueryPlan delegate = ((HashJoinPlan) plan).getDelegate();
+            if (delegate instanceof BaseQueryPlan) {
+                basePlan = (BaseQueryPlan) delegate;
+            }
+        }
+        // We can not merge with the base plan that has a limit already.
+        // But if there is order-by without a limit, we can simply ignore the order-by.
+        if (plan.getLimit() != null) {
+            basePlan = null;
+        }
+        PhoenixStatement stmt = plan.getContext().getStatement();
+        StatementContext context;
+        try {
+            context = basePlan == null ? new StatementContext(stmt, FromCompiler.getResolver(tableRef), new Scan(), new SequenceManager(stmt)) : basePlan.getContext();
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+        
+        List<OrderByExpression> orderByExpressions = Lists.newArrayList();
+        for (RelFieldCollation fieldCollation : getCollation().getFieldCollations()) {
+            Expression expr = implementor.newColumnExpression(fieldCollation.getFieldIndex());
+            boolean isAscending = fieldCollation.getDirection() == Direction.ASCENDING;
+            if (expr.getSortOrder() == SortOrder.DESC) {
+                isAscending = !isAscending;
+            }
+            orderByExpressions.add(new OrderByExpression(expr, fieldCollation.nullDirection == NullDirection.LAST, isAscending));
+        }
+        OrderBy orderBy = new OrderBy(orderByExpressions);
+        
+        SelectStatement select = SelectStatement.SELECT_STAR;
+        if (basePlan == null) {
+            return new ClientScanPlan(context, select, tableRef, RowProjector.EMPTY_PROJECTOR, null, null, orderBy, plan);
+        }
+        
+        QueryPlan newPlan;
+        try {
+            if (basePlan instanceof ScanPlan) {
+                newPlan = ScanPlan.create((ScanPlan) basePlan, orderBy);
+            } else {
+                newPlan = AggregatePlan.create((AggregatePlan) basePlan, orderBy);
+            }
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+        
+        if (plan instanceof HashJoinPlan) {        
+            HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
+            newPlan = HashJoinPlan.create(select, newPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
+        }
+        return newPlan;
+    }
+    
+    private boolean isServerSortDoable() {
+        RelNode rel = CalciteUtils.getBestRel(getInput());
+        return rel instanceof PhoenixRel 
+                && ((PhoenixRel) rel).getPlanType() != PlanType.CLIENT_SERVER;
+    }
+    
+    private boolean isClientSortMergable() {
+        RelNode rel = CalciteUtils.getBestRel(getInput());        
+        return rel instanceof PhoenixAggregate;        
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
index 215f59e..70b3b97 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
@@ -58,7 +58,7 @@ public class OrderByCompiler {
         
         private final List<OrderByExpression> orderByExpressions;
         
-        private OrderBy(List<OrderByExpression> orderByExpressions) {
+        public OrderBy(List<OrderByExpression> orderByExpressions) {
             this.orderByExpressions = ImmutableList.copyOf(orderByExpressions);
         }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 617cc48..241814c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -71,6 +71,10 @@ public class AggregatePlan extends BaseQueryPlan {
     private final Expression having;
     private List<KeyRange> splits;
     private List<List<Scan>> scans;
+    
+    public static AggregatePlan create(AggregatePlan plan, OrderBy newOrderBy) {
+        return new AggregatePlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), plan.getProjector(), plan.getLimit(), newOrderBy, plan.parallelIteratorFactory, plan.getGroupBy(), plan.getHaving());
+    }
 
     public AggregatePlan(
             StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index d0a71f4..0dfbcbf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -73,6 +73,10 @@ public class ScanPlan extends BaseQueryPlan {
     private List<List<Scan>> scans;
     private boolean allowPageFilter;
 
+    public static ScanPlan create(ScanPlan plan, OrderBy newOrderBy) throws SQLException {
+        return new ScanPlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), plan.getProjector(), plan.getLimit(), newOrderBy, plan.parallelIteratorFactory, plan.allowPageFilter);
+    }
+    
     public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) throws SQLException {
         super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, GroupBy.EMPTY_GROUP_BY,
                 parallelIteratorFactory != null ? parallelIteratorFactory :