You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/04/14 18:01:14 UTC
phoenix git commit: Add PhoenixCompactClientSort for merging OrderBy
into AggregatePlan
Repository: phoenix
Updated Branches:
refs/heads/calcite 6a8c37629 -> fa60f8ca7
Add PhoenixCompactClientSort for merging OrderBy into AggregatePlan
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/fa60f8ca
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/fa60f8ca
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/fa60f8ca
Branch: refs/heads/calcite
Commit: fa60f8ca7c9dea4333fcc55d196385b885bafec5
Parents: 6a8c376
Author: maryannxue <we...@intel.com>
Authored: Tue Apr 14 12:00:54 2015 -0400
Committer: maryannxue <we...@intel.com>
Committed: Tue Apr 14 12:00:54 2015 -0400
----------------------------------------------------------------------
.../org/apache/phoenix/calcite/CalciteTest.java | 6 ++--
.../phoenix/calcite/PhoenixClientSort.java | 2 +-
.../calcite/PhoenixCompactClientSort.java | 33 ++++++++++++++------
.../phoenix/calcite/PhoenixServerAggregate.java | 13 ++++----
.../phoenix/calcite/PhoenixServerSort.java | 15 ++++-----
.../org/apache/phoenix/calcite/PhoenixSort.java | 8 +++--
.../phoenix/calcite/PhoenixTableScan.java | 2 +-
.../apache/phoenix/execute/HashJoinPlan.java | 8 +++--
.../phoenix/execute/TupleProjectionPlan.java | 8 +++++
9 files changed, 63 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa60f8ca/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
index df870e1..9d2dbea 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
@@ -385,8 +385,8 @@ public class CalciteTest extends BaseClientManagedTimeIT {
start().sql("select count(entity_id), a_string from atable group by a_string order by count(entity_id), a_string desc")
.explainIs("PhoenixToEnumerableConverter\n" +
- " PhoenixClientSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[DESC])\n" +
- " PhoenixClientProject(EXPR$0=[$1], A_STRING=[$0])\n" +
+ " PhoenixClientProject(EXPR$0=[$1], A_STRING=[$0])\n" +
+ " PhoenixCompactClientSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[DESC])\n" +
" PhoenixServerAggregate(group=[{0}], EXPR$0=[COUNT()])\n" +
" PhoenixServerProject(A_STRING=[$2])\n" +
" PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
@@ -398,7 +398,7 @@ public class CalciteTest extends BaseClientManagedTimeIT {
start().sql("select s.name, count(\"item_id\") from " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s join " + JOIN_ITEM_TABLE_FULL_NAME + " i on s.\"supplier_id\" = i.\"supplier_id\" group by s.name order by count(\"item_id\"), s.name desc")
.explainIs("PhoenixToEnumerableConverter\n" +
- " PhoenixClientSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[DESC])\n" +
+ " PhoenixCompactClientSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[DESC])\n" +
" PhoenixServerAggregate(group=[{0}], EXPR$1=[COUNT()])\n" +
" PhoenixServerProject(NAME=[$2])\n" +
" PhoenixServerJoin(condition=[=($1, $0)], joinType=[inner])\n" +
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa60f8ca/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixClientSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixClientSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixClientSort.java
index 6bb67fb..417bda8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixClientSort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixClientSort.java
@@ -56,7 +56,7 @@ public class PhoenixClientSort extends PhoenixSort {
throw new RuntimeException(e);
}
- OrderBy orderBy = super.getOrderBy(implementor);
+ OrderBy orderBy = super.getOrderBy(implementor, null);
Integer limit = super.getLimit(implementor);
return new ClientScanPlan(context, plan.getStatement(), tableRef, RowProjector.EMPTY_PROJECTOR, limit, null, orderBy, plan);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa60f8ca/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixCompactClientSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixCompactClientSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixCompactClientSort.java
index 7037598..40a0697 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixCompactClientSort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixCompactClientSort.java
@@ -11,7 +11,8 @@ import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.execute.HashJoinPlan;
import org.apache.phoenix.execute.AggregatePlan;
-import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.execute.TupleProjectionPlan;
+import org.apache.phoenix.execute.TupleProjector;
public class PhoenixCompactClientSort extends PhoenixSort {
@@ -40,26 +41,38 @@ public class PhoenixCompactClientSort extends PhoenixSort {
throw new UnsupportedOperationException();
QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
- assert (plan instanceof AggregatePlan || plan instanceof HashJoinPlan)
- && plan.getLimit() == null;
+ assert plan instanceof TupleProjectionPlan;
+
+ // PhoenixServerAggregate wraps the AggregatePlan with a TupleProjectionPlan,
+ // so we need to unwrap the TupleProjectionPlan.
+ TupleProjectionPlan tupleProjectionPlan = (TupleProjectionPlan) plan;
+ assert tupleProjectionPlan.getPostFilter() == null;
+ QueryPlan innerPlan = tupleProjectionPlan.getDelegate();
+ TupleProjector tupleProjector = tupleProjectionPlan.getTupleProjector();
+ assert (innerPlan instanceof AggregatePlan
+ || innerPlan instanceof HashJoinPlan)
+ && innerPlan.getLimit() == null;
AggregatePlan basePlan;
- if (plan instanceof AggregatePlan) {
- basePlan = (AggregatePlan) plan;
+ HashJoinPlan hashJoinPlan = null;
+ if (innerPlan instanceof AggregatePlan) {
+ basePlan = (AggregatePlan) innerPlan;
} else {
- QueryPlan delegate = ((HashJoinPlan) plan).getDelegate();
+ hashJoinPlan = (HashJoinPlan) innerPlan;
+ QueryPlan delegate = hashJoinPlan.getDelegate();
assert delegate instanceof AggregatePlan;
basePlan = (AggregatePlan) delegate;
}
- OrderBy orderBy = super.getOrderBy(implementor);
+ OrderBy orderBy = super.getOrderBy(implementor, tupleProjector);
Integer limit = super.getLimit(implementor);
QueryPlan newPlan = AggregatePlan.create((AggregatePlan) basePlan, orderBy, limit);
- if (plan instanceof HashJoinPlan) {
- HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
- newPlan = HashJoinPlan.create((SelectStatement) (plan.getStatement()), newPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
+ if (hashJoinPlan != null) {
+ newPlan = HashJoinPlan.create(hashJoinPlan.getStatement(), newPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
}
+ // Recover the wrapping of TupleProjectionPlan
+ newPlan = new TupleProjectionPlan(newPlan, tupleProjector, null);
return newPlan;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa60f8ca/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerAggregate.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerAggregate.java
index a535a35..bdff153 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerAggregate.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerAggregate.java
@@ -18,7 +18,6 @@ import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.execute.AggregatePlan;
import org.apache.phoenix.execute.HashJoinPlan;
import org.apache.phoenix.execute.ScanPlan;
-import org.apache.phoenix.parse.SelectStatement;
public class PhoenixServerAggregate extends PhoenixAggregate {
@@ -45,14 +44,17 @@ public class PhoenixServerAggregate extends PhoenixAggregate {
assert getConvention() == getInput().getConvention();
QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
- assert (plan instanceof ScanPlan || plan instanceof HashJoinPlan)
+ assert (plan instanceof ScanPlan
+ || plan instanceof HashJoinPlan)
&& plan.getLimit() == null;
ScanPlan basePlan;
+ HashJoinPlan hashJoinPlan = null;
if (plan instanceof ScanPlan) {
basePlan = (ScanPlan) plan;
} else {
- QueryPlan delegate = ((HashJoinPlan) plan).getDelegate();
+ hashJoinPlan = (HashJoinPlan) plan;
+ QueryPlan delegate = hashJoinPlan.getDelegate();
assert delegate instanceof ScanPlan;
basePlan = (ScanPlan) delegate;
}
@@ -62,9 +64,8 @@ public class PhoenixServerAggregate extends PhoenixAggregate {
super.serializeAggregators(implementor, context, groupBy.isEmpty());
QueryPlan aggPlan = new AggregatePlan(context, basePlan.getStatement(), basePlan.getTableRef(), RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null, groupBy, null);
- if (plan instanceof HashJoinPlan) {
- HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
- aggPlan = HashJoinPlan.create((SelectStatement) (plan.getStatement()), aggPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
+ if (hashJoinPlan != null) {
+ aggPlan = HashJoinPlan.create(hashJoinPlan.getStatement(), aggPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
}
return PhoenixAggregate.wrapWithProject(implementor, aggPlan, groupBy.getKeyExpressions(), Arrays.asList(context.getAggregationManager().getAggregators().getFunctions()));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa60f8ca/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerSort.java
index 1538309..17516dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerSort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerSort.java
@@ -13,7 +13,6 @@ import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.execute.HashJoinPlan;
import org.apache.phoenix.execute.ScanPlan;
-import org.apache.phoenix.parse.SelectStatement;
public class PhoenixServerSort extends PhoenixSort {
@@ -42,19 +41,22 @@ public class PhoenixServerSort extends PhoenixSort {
throw new UnsupportedOperationException();
QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
- assert (plan instanceof ScanPlan || plan instanceof HashJoinPlan)
+ assert (plan instanceof ScanPlan
+ || plan instanceof HashJoinPlan)
&& plan.getLimit() == null;
ScanPlan basePlan;
+ HashJoinPlan hashJoinPlan = null;
if (plan instanceof ScanPlan) {
basePlan = (ScanPlan) plan;
} else {
- QueryPlan delegate = ((HashJoinPlan) plan).getDelegate();
+ hashJoinPlan = (HashJoinPlan) plan;
+ QueryPlan delegate = hashJoinPlan.getDelegate();
assert delegate instanceof ScanPlan;
basePlan = (ScanPlan) delegate;
}
- OrderBy orderBy = super.getOrderBy(implementor);
+ OrderBy orderBy = super.getOrderBy(implementor, null);
Integer limit = super.getLimit(implementor);
QueryPlan newPlan;
try {
@@ -63,9 +65,8 @@ public class PhoenixServerSort extends PhoenixSort {
throw new RuntimeException(e);
}
- if (plan instanceof HashJoinPlan) {
- HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
- newPlan = HashJoinPlan.create((SelectStatement) (plan.getStatement()), newPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
+ if (hashJoinPlan != null) {
+ newPlan = HashJoinPlan.create(hashJoinPlan.getStatement(), newPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
}
return newPlan;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa60f8ca/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java
index 1b28973..2338b5e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java
@@ -12,9 +12,11 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rex.RexNode;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.OrderByExpression;
import org.apache.phoenix.schema.SortOrder;
+
import com.google.common.collect.Lists;
/**
@@ -31,12 +33,14 @@ abstract public class PhoenixSort extends Sort implements PhoenixRel {
assert getConvention() == PhoenixRel.CONVENTION;
}
- protected OrderBy getOrderBy(Implementor implementor) {
+ protected OrderBy getOrderBy(Implementor implementor, TupleProjector tupleProjector) {
assert !getCollation().getFieldCollations().isEmpty();
List<OrderByExpression> orderByExpressions = Lists.newArrayList();
for (RelFieldCollation fieldCollation : getCollation().getFieldCollations()) {
- Expression expr = implementor.newColumnExpression(fieldCollation.getFieldIndex());
+ Expression expr = tupleProjector == null ?
+ implementor.newColumnExpression(fieldCollation.getFieldIndex())
+ : tupleProjector.getExpressions()[fieldCollation.getFieldIndex()];
boolean isAscending = fieldCollation.getDirection() == Direction.ASCENDING;
if (expr.getSortOrder() == SortOrder.DESC) {
isAscending = !isAscending;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa60f8ca/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
index c5db7f1..6044739 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
@@ -81,7 +81,7 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel {
planner.addRule(PhoenixServerSortRule.SORT_SCAN);
planner.addRule(PhoenixServerSortRule.SORT_SERVERJOIN);
planner.addRule(PhoenixServerSortRule.SORT_SERVERPROJECT);
- // TODO planner.addRule(PhoenixCompactClientSortRule.SORT_SERVERAGGREGATE);
+ planner.addRule(PhoenixCompactClientSortRule.SORT_SERVERAGGREGATE);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa60f8ca/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 0f6edc3..cc2a244 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -54,7 +54,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.job.JobManager.JobCallable;
import org.apache.phoenix.join.HashCacheClient;
import org.apache.phoenix.join.HashJoinInfo;
-import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.SQLParser;
import org.apache.phoenix.parse.SelectStatement;
@@ -118,6 +117,11 @@ public class HashJoinPlan extends DelegateQueryPlan {
public SubPlan[] getSubPlans() {
return this.subPlans;
}
+
+ @Override
+ public Integer getLimit() {
+ return this.joinInfo == null ? null : this.joinInfo.getLimit();
+ }
@Override
public ResultIterator iterator() throws SQLException {
@@ -232,7 +236,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
}
@Override
- public FilterableStatement getStatement() {
+ public SelectStatement getStatement() {
return statement;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa60f8ca/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
index c9cbd15..c2984b1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
@@ -40,6 +40,14 @@ public class TupleProjectionPlan extends DelegateQueryPlan {
this.tupleProjector = tupleProjector;
this.postFilter = postFilter;
}
+
+ public TupleProjector getTupleProjector() {
+ return this.tupleProjector;
+ }
+
+ public Expression getPostFilter() {
+ return this.postFilter;
+ }
@Override
public ExplainPlan getExplainPlan() throws SQLException {