You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/04/13 01:40:37 UTC
phoenix git commit: PHOENIX-1841 Implement PhoenixSort in
Phoenix/Calcite Integration
Repository: phoenix
Updated Branches:
refs/heads/calcite f04eaf172 -> ae0c234fa
PHOENIX-1841 Implement PhoenixSort in Phoenix/Calcite Integration
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ae0c234f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ae0c234f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ae0c234f
Branch: refs/heads/calcite
Commit: ae0c234faf9bd4edac23a7511790104cf86c097e
Parents: f04eaf1
Author: maryannxue <we...@intel.com>
Authored: Sun Apr 12 19:40:22 2015 -0400
Committer: maryannxue <we...@intel.com>
Committed: Sun Apr 12 19:40:22 2015 -0400
----------------------------------------------------------------------
.../org/apache/phoenix/calcite/CalciteTest.java | 86 +++++++++++++-
.../apache/phoenix/calcite/CalciteUtils.java | 10 ++
.../phoenix/calcite/PhoenixAggregate.java | 15 +--
.../apache/phoenix/calcite/PhoenixFilter.java | 3 -
.../org/apache/phoenix/calcite/PhoenixJoin.java | 12 +-
.../apache/phoenix/calcite/PhoenixProject.java | 13 +--
.../apache/phoenix/calcite/PhoenixRules.java | 2 +-
.../org/apache/phoenix/calcite/PhoenixSort.java | 113 ++++++++++++++++++-
.../apache/phoenix/compile/OrderByCompiler.java | 2 +-
.../apache/phoenix/execute/AggregatePlan.java | 4 +
.../org/apache/phoenix/execute/ScanPlan.java | 4 +
11 files changed, 230 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
index 8d2be0d..3f39768 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
@@ -345,7 +345,91 @@ public class CalciteTest extends BaseClientManagedTimeIT {
{"a"},
{"b"},
{"c"}})
- .close();;
+ .close();
+ }
+
+ @Test public void testSort() {
+ start().sql("select organization_id, entity_id, a_string from aTable order by a_string, entity_id")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixSort(sort0=[$2], sort1=[$1], dir0=[ASC], dir1=[ASC])\n" +
+ " PhoenixProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[$2])\n" +
+ " PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
+ .resultIs(new Object[][] {
+ {"00D300000000XHP", "00A123122312312", "a"},
+ {"00D300000000XHP", "00A223122312312", "a"},
+ {"00D300000000XHP", "00A323122312312", "a"},
+ {"00D300000000XHP", "00A423122312312", "a"},
+ {"00D300000000XHP", "00B523122312312", "b"},
+ {"00D300000000XHP", "00B623122312312", "b"},
+ {"00D300000000XHP", "00B723122312312", "b"},
+ {"00D300000000XHP", "00B823122312312", "b"},
+ {"00D300000000XHP", "00C923122312312", "c"}})
+ .close();
+
+ start().sql("select organization_id, entity_id, a_string from aTable order by organization_id, entity_id")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC])\n" +
+ " PhoenixProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[$2])\n" +
+ " PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
+ .resultIs(new Object[][] {
+ {"00D300000000XHP", "00A123122312312", "a"},
+ {"00D300000000XHP", "00A223122312312", "a"},
+ {"00D300000000XHP", "00A323122312312", "a"},
+ {"00D300000000XHP", "00A423122312312", "a"},
+ {"00D300000000XHP", "00B523122312312", "b"},
+ {"00D300000000XHP", "00B623122312312", "b"},
+ {"00D300000000XHP", "00B723122312312", "b"},
+ {"00D300000000XHP", "00B823122312312", "b"},
+ {"00D300000000XHP", "00C923122312312", "c"}})
+ .close();
+
+ start().sql("select count(entity_id), a_string from atable group by a_string order by count(entity_id), a_string desc")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixProject(EXPR$0=[$1], A_STRING=[$0])\n" +
+ " PhoenixSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[DESC])\n" +
+ " PhoenixAggregate(group=[{0}], EXPR$0=[COUNT()])\n" +
+ " PhoenixProject(A_STRING=[$2])\n" +
+ " PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
+ .resultIs(new Object[][] {
+ {1L, "c"},
+ {4L, "b"},
+ {4L, "a"}})
+ .close();
+
+ start().sql("select s.name, count(\"item_id\") from " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s join " + JOIN_ITEM_TABLE_FULL_NAME + " i on s.\"supplier_id\" = i.\"supplier_id\" group by s.name order by count(\"item_id\"), s.name desc")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[DESC])\n" +
+ " PhoenixAggregate(group=[{0}], EXPR$1=[COUNT()])\n" +
+ " PhoenixProject(NAME=[$1])\n" +
+ " PhoenixJoin(condition=[=($0, $2)], joinType=[inner])\n" +
+ " PhoenixProject(supplier_id=[$0], NAME=[$1])\n" +
+ " PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]])\n" +
+ " PhoenixProject(supplier_id=[$5])\n" +
+ " PhoenixTableScan(table=[[phoenix, ITEMTABLE]])\n")
+ .resultIs(new Object[][] {
+ {"S6", 1L},
+ {"S5", 1L},
+ {"S2", 2L},
+ {"S1", 2L}})
+ .close();
+
+ start().sql("SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" order by item.name desc")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixSort(sort0=[$1], dir0=[DESC])\n" +
+ " PhoenixProject(item_id=[$0], NAME=[$1], supplier_id=[$3], NAME0=[$4])\n" +
+ " PhoenixJoin(condition=[=($2, $3)], joinType=[inner])\n" +
+ " PhoenixProject(item_id=[$0], NAME=[$1], supplier_id=[$5])\n" +
+ " PhoenixTableScan(table=[[phoenix, ITEMTABLE]])\n" +
+ " PhoenixProject(supplier_id=[$0], NAME=[$1])\n" +
+ " PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]])\n")
+ .resultIs(new Object[][] {
+ {"0000000006", "T6", "0000000006", "S6"},
+ {"0000000005", "T5", "0000000005", "S5"},
+ {"0000000004", "T4", "0000000002", "S2"},
+ {"0000000003", "T3", "0000000002", "S2"},
+ {"0000000002", "T2", "0000000001", "S1"},
+ {"0000000001", "T1", "0000000001", "S1"}})
+ .close();
}
@Test public void testSubquery() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
index 4110b5e..d2a4a31 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
@@ -5,6 +5,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
@@ -15,6 +17,7 @@ import org.apache.calcite.sql.SqlKind;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.calcite.PhoenixRel.Implementor;
+import org.apache.phoenix.calcite.PhoenixRel.PlanType;
import org.apache.phoenix.expression.ComparisonExpression;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.ExpressionType;
@@ -39,6 +42,13 @@ public class CalciteUtils {
public static String createTempAlias() {
return "$" + tempAliasCounter.incrementAndGet();
}
+
+ public static RelNode getBestRel(RelNode rel) {
+ if (rel instanceof RelSubset)
+ return ((RelSubset) rel).getBest();
+
+ return rel;
+ }
private static final Map<SqlKind, ExpressionFactory> EXPRESSION_MAP = Maps
.newHashMapWithExpectedSize(ExpressionType.values().length);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java
index c3d4982..ca3a34c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java
@@ -7,7 +7,6 @@ import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.plan.volcano.RelSubset;
import org.apache.calcite.rel.InvalidRelException;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
@@ -103,8 +102,9 @@ public class PhoenixAggregate extends Aggregate implements PhoenixRel {
basePlan = (ScanPlan) delegate;
}
}
- // TopN, we can not merge with the base plan.
- if (!plan.getOrderBy().getOrderByExpressions().isEmpty() && plan.getLimit() != null) {
+ // We can not merge with the base plan that has a limit already.
+ // But if there is order-by without a limit, we can simply ignore the order-by.
+ if (plan.getLimit() != null) {
basePlan = null;
}
PhoenixStatement stmt = plan.getContext().getStatement();
@@ -183,12 +183,9 @@ public class PhoenixAggregate extends Aggregate implements PhoenixRel {
}
private boolean isServerAggregateDoable() {
- RelNode rel = getInput();
- if (rel instanceof RelSubset) {
- rel = ((RelSubset) rel).getBest();
- }
-
- return rel instanceof PhoenixRel && ((PhoenixRel) rel).getPlanType() != PlanType.CLIENT_SERVER;
+ RelNode rel = CalciteUtils.getBestRel(getInput());
+ return rel instanceof PhoenixRel
+ && ((PhoenixRel) rel).getPlanType() != PlanType.CLIENT_SERVER;
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilter.java
index 8925ead..0a49477 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilter.java
@@ -34,9 +34,6 @@ public class PhoenixFilter extends Filter implements PhoenixRel {
public QueryPlan implement(Implementor implementor) {
assert getConvention() == getInput().getConvention();
QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
- // TODO: what to do with the Expression?
- // Already determined this filter cannot be pushed down, so
- // this will be run
Expression expr = CalciteUtils.toExpression(condition, implementor);
return new ClientScanPlan(plan.getContext(), plan.getStatement(), plan.getTableRef(),
plan.getProjector(), null, expr, OrderBy.EMPTY_ORDER_BY, plan);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java
index c316b5d..0b85217 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java
@@ -9,7 +9,6 @@ import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.plan.volcano.RelSubset;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinInfo;
@@ -125,11 +124,10 @@ public class PhoenixJoin extends Join implements PhoenixRel {
private boolean isHashJoinDoable() {
// TODO check memory limit
- RelNode rel = getLeft();
- if (rel instanceof RelSubset) {
- rel = ((RelSubset) rel).getBest();
- }
- return (rel instanceof PhoenixRel && ((PhoenixRel) rel).getPlanType() == PlanType.SERVER_ONLY_FLAT) && getJoinType() != JoinRelType.RIGHT;
+ RelNode rel = CalciteUtils.getBestRel(getLeft());
+ return rel instanceof PhoenixRel
+ && ((PhoenixRel) rel).getPlanType() == PlanType.SERVER_ONLY_FLAT
+ && getJoinType() != JoinRelType.RIGHT;
}
private JoinType convertJoinType(JoinRelType type) {
@@ -155,6 +153,6 @@ public class PhoenixJoin extends Join implements PhoenixRel {
@Override
public PlanType getPlanType() {
- return PlanType.SERVER_ONLY_COMPLEX;
+ return isHashJoinDoable() ? PlanType.SERVER_ONLY_COMPLEX : PlanType.CLIENT_SERVER;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
index 4f08968..53793d7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
@@ -6,13 +6,11 @@ import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.plan.volcano.RelSubset;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.execute.ScanPlan;
import org.apache.phoenix.execute.TupleProjectionPlan;
import org.apache.phoenix.execute.TupleProjector;
@@ -71,13 +69,8 @@ public class PhoenixProject extends Project implements PhoenixRel {
@Override
public PlanType getPlanType() {
- RelNode rel = getInput();
- if (rel instanceof RelSubset) {
- rel = ((RelSubset) rel).getBest();
- }
- // TODO this is based on the assumption that there is no two Project
- // in a row and Project can be pushed down to the input node if it is
- // a server plan.
- return !(rel instanceof PhoenixRel) ? PlanType.CLIENT_SERVER : ((PhoenixRel) rel).getPlanType();
+ RelNode rel = CalciteUtils.getBestRel(getInput());
+ return rel instanceof PhoenixRel && !(rel instanceof PhoenixProject) ?
+ ((PhoenixRel) rel).getPlanType() : PlanType.CLIENT_SERVER;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java
index e98ee73..843e6de 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java
@@ -68,7 +68,7 @@ public class PhoenixRules {
sort.getTraitSet().replace(out)
.replace(sort.getCollation());
return new PhoenixSort(rel.getCluster(), traitSet,
- convert(sort.getInput(), sort.getInput().getTraitSet().replace(RelCollationImpl.EMPTY)),
+ convert(sort.getInput(), sort.getInput().getTraitSet().replace(out)),
sort.getCollation(), sort.offset, sort.fetch);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java
index 6d11231..978e264 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java
@@ -1,12 +1,39 @@
package org.apache.phoenix.calcite;
+import java.sql.SQLException;
+import java.util.List;
+
import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rex.RexNode;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.FromCompiler;
import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.SequenceManager;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.BaseQueryPlan;
+import org.apache.phoenix.execute.ClientScanPlan;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableRef;
+
+import com.google.common.collect.Lists;
/**
* Implementation of {@link org.apache.calcite.rel.core.Sort}
@@ -15,6 +42,8 @@ import org.apache.phoenix.compile.QueryPlan;
* <p>Like {@code Sort}, it also supports LIMIT and OFFSET.
*/
public class PhoenixSort extends Sort implements PhoenixRel {
+ private static final double CLIENT_MERGE_FACTOR = 0.5;
+
public PhoenixSort(RelOptCluster cluster, RelTraitSet traits, RelNode child, RelCollation collation, RexNode offset, RexNode fetch) {
super(cluster, traits, child, collation, offset, fetch);
}
@@ -23,11 +52,91 @@ public class PhoenixSort extends Sort implements PhoenixRel {
public PhoenixSort copy(RelTraitSet traitSet, RelNode newInput, RelCollation newCollation, RexNode offset, RexNode fetch) {
return new PhoenixSort(getCluster(), traitSet, newInput, newCollation, offset, fetch);
}
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ RelOptCost cost = super.computeSelfCost(planner);
+ if (isServerSortDoable()) {
+ cost = cost.multiplyBy(SERVER_FACTOR);
+ } else if (isClientSortMergable()) {
+ cost = cost.multiplyBy(CLIENT_MERGE_FACTOR);
+ }
+ return cost.multiplyBy(PHOENIX_FACTOR);
+ }
@Override
public QueryPlan implement(Implementor implementor) {
- implementor.visitInput(0, (PhoenixRel) getInput());
- throw new UnsupportedOperationException();
+ assert getConvention() == getInput().getConvention();
+ if (this.fetch != null || this.offset != null)
+ throw new UnsupportedOperationException();
+
+ QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+ TableRef tableRef = implementor.getTableRef();
+ BaseQueryPlan basePlan = null;
+ if (plan instanceof BaseQueryPlan) {
+ basePlan = (BaseQueryPlan) plan;
+ } else if (plan instanceof HashJoinPlan) {
+ QueryPlan delegate = ((HashJoinPlan) plan).getDelegate();
+ if (delegate instanceof BaseQueryPlan) {
+ basePlan = (BaseQueryPlan) delegate;
+ }
+ }
+ // We can not merge with the base plan that has a limit already.
+ // But if there is order-by without a limit, we can simply ignore the order-by.
+ if (plan.getLimit() != null) {
+ basePlan = null;
+ }
+ PhoenixStatement stmt = plan.getContext().getStatement();
+ StatementContext context;
+ try {
+ context = basePlan == null ? new StatementContext(stmt, FromCompiler.getResolver(tableRef), new Scan(), new SequenceManager(stmt)) : basePlan.getContext();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+
+ List<OrderByExpression> orderByExpressions = Lists.newArrayList();
+ for (RelFieldCollation fieldCollation : getCollation().getFieldCollations()) {
+ Expression expr = implementor.newColumnExpression(fieldCollation.getFieldIndex());
+ boolean isAscending = fieldCollation.getDirection() == Direction.ASCENDING;
+ if (expr.getSortOrder() == SortOrder.DESC) {
+ isAscending = !isAscending;
+ }
+ orderByExpressions.add(new OrderByExpression(expr, fieldCollation.nullDirection == NullDirection.LAST, isAscending));
+ }
+ OrderBy orderBy = new OrderBy(orderByExpressions);
+
+ SelectStatement select = SelectStatement.SELECT_STAR;
+ if (basePlan == null) {
+ return new ClientScanPlan(context, select, tableRef, RowProjector.EMPTY_PROJECTOR, null, null, orderBy, plan);
+ }
+
+ QueryPlan newPlan;
+ try {
+ if (basePlan instanceof ScanPlan) {
+ newPlan = ScanPlan.create((ScanPlan) basePlan, orderBy);
+ } else {
+ newPlan = AggregatePlan.create((AggregatePlan) basePlan, orderBy);
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (plan instanceof HashJoinPlan) {
+ HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
+ newPlan = HashJoinPlan.create(select, newPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
+ }
+ return newPlan;
+ }
+
+ private boolean isServerSortDoable() {
+ RelNode rel = CalciteUtils.getBestRel(getInput());
+ return rel instanceof PhoenixRel
+ && ((PhoenixRel) rel).getPlanType() != PlanType.CLIENT_SERVER;
+ }
+
+ private boolean isClientSortMergable() {
+ RelNode rel = CalciteUtils.getBestRel(getInput());
+ return rel instanceof PhoenixAggregate;
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
index 215f59e..70b3b97 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
@@ -58,7 +58,7 @@ public class OrderByCompiler {
private final List<OrderByExpression> orderByExpressions;
- private OrderBy(List<OrderByExpression> orderByExpressions) {
+ public OrderBy(List<OrderByExpression> orderByExpressions) {
this.orderByExpressions = ImmutableList.copyOf(orderByExpressions);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 617cc48..241814c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -71,6 +71,10 @@ public class AggregatePlan extends BaseQueryPlan {
private final Expression having;
private List<KeyRange> splits;
private List<List<Scan>> scans;
+
+ public static AggregatePlan create(AggregatePlan plan, OrderBy newOrderBy) {
+ return new AggregatePlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), plan.getProjector(), plan.getLimit(), newOrderBy, plan.parallelIteratorFactory, plan.getGroupBy(), plan.getHaving());
+ }
public AggregatePlan(
StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ae0c234f/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index d0a71f4..0dfbcbf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -73,6 +73,10 @@ public class ScanPlan extends BaseQueryPlan {
private List<List<Scan>> scans;
private boolean allowPageFilter;
+ public static ScanPlan create(ScanPlan plan, OrderBy newOrderBy) throws SQLException {
+ return new ScanPlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), plan.getProjector(), plan.getLimit(), newOrderBy, plan.parallelIteratorFactory, plan.allowPageFilter);
+ }
+
public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) throws SQLException {
super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, GroupBy.EMPTY_GROUP_BY,
parallelIteratorFactory != null ? parallelIteratorFactory :