You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/04/14 03:36:48 UTC
[1/2] phoenix git commit: PHOENIX-1845 Implement two sets (server and
client) of Phoenix conventions
Repository: phoenix
Updated Branches:
refs/heads/calcite ae0c234fa -> 8cde1c54e
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java
index 978e264..1b28973 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java
@@ -1,11 +1,8 @@
package org.apache.phoenix.calcite;
-import java.sql.SQLException;
import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelFieldCollation;
@@ -14,25 +11,10 @@ import org.apache.calcite.rel.RelFieldCollation.NullDirection;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rex.RexNode;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.phoenix.compile.FromCompiler;
-import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.compile.RowProjector;
-import org.apache.phoenix.compile.SequenceManager;
-import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
-import org.apache.phoenix.execute.AggregatePlan;
-import org.apache.phoenix.execute.BaseQueryPlan;
-import org.apache.phoenix.execute.ClientScanPlan;
-import org.apache.phoenix.execute.HashJoinPlan;
-import org.apache.phoenix.execute.ScanPlan;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.OrderByExpression;
-import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.schema.SortOrder;
-import org.apache.phoenix.schema.TableRef;
-
import com.google.common.collect.Lists;
/**
@@ -41,58 +23,16 @@ import com.google.common.collect.Lists;
*
* <p>Like {@code Sort}, it also supports LIMIT and OFFSET.
*/
-public class PhoenixSort extends Sort implements PhoenixRel {
- private static final double CLIENT_MERGE_FACTOR = 0.5;
+abstract public class PhoenixSort extends Sort implements PhoenixRel {
+ protected static final double CLIENT_MERGE_FACTOR = 0.5;
public PhoenixSort(RelOptCluster cluster, RelTraitSet traits, RelNode child, RelCollation collation, RexNode offset, RexNode fetch) {
super(cluster, traits, child, collation, offset, fetch);
- }
-
- @Override
- public PhoenixSort copy(RelTraitSet traitSet, RelNode newInput, RelCollation newCollation, RexNode offset, RexNode fetch) {
- return new PhoenixSort(getCluster(), traitSet, newInput, newCollation, offset, fetch);
+ assert getConvention() == PhoenixRel.CONVENTION;
}
- @Override
- public RelOptCost computeSelfCost(RelOptPlanner planner) {
- RelOptCost cost = super.computeSelfCost(planner);
- if (isServerSortDoable()) {
- cost = cost.multiplyBy(SERVER_FACTOR);
- } else if (isClientSortMergable()) {
- cost = cost.multiplyBy(CLIENT_MERGE_FACTOR);
- }
- return cost.multiplyBy(PHOENIX_FACTOR);
- }
-
- @Override
- public QueryPlan implement(Implementor implementor) {
- assert getConvention() == getInput().getConvention();
- if (this.fetch != null || this.offset != null)
- throw new UnsupportedOperationException();
-
- QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
- TableRef tableRef = implementor.getTableRef();
- BaseQueryPlan basePlan = null;
- if (plan instanceof BaseQueryPlan) {
- basePlan = (BaseQueryPlan) plan;
- } else if (plan instanceof HashJoinPlan) {
- QueryPlan delegate = ((HashJoinPlan) plan).getDelegate();
- if (delegate instanceof BaseQueryPlan) {
- basePlan = (BaseQueryPlan) delegate;
- }
- }
- // We can not merge with the base plan that has a limit already.
- // But if there is order-by without a limit, we can simply ignore the order-by.
- if (plan.getLimit() != null) {
- basePlan = null;
- }
- PhoenixStatement stmt = plan.getContext().getStatement();
- StatementContext context;
- try {
- context = basePlan == null ? new StatementContext(stmt, FromCompiler.getResolver(tableRef), new Scan(), new SequenceManager(stmt)) : basePlan.getContext();
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
+ protected OrderBy getOrderBy(Implementor implementor) {
+ assert !getCollation().getFieldCollations().isEmpty();
List<OrderByExpression> orderByExpressions = Lists.newArrayList();
for (RelFieldCollation fieldCollation : getCollation().getFieldCollations()) {
@@ -103,44 +43,12 @@ public class PhoenixSort extends Sort implements PhoenixRel {
}
orderByExpressions.add(new OrderByExpression(expr, fieldCollation.nullDirection == NullDirection.LAST, isAscending));
}
- OrderBy orderBy = new OrderBy(orderByExpressions);
- SelectStatement select = SelectStatement.SELECT_STAR;
- if (basePlan == null) {
- return new ClientScanPlan(context, select, tableRef, RowProjector.EMPTY_PROJECTOR, null, null, orderBy, plan);
- }
-
- QueryPlan newPlan;
- try {
- if (basePlan instanceof ScanPlan) {
- newPlan = ScanPlan.create((ScanPlan) basePlan, orderBy);
- } else {
- newPlan = AggregatePlan.create((AggregatePlan) basePlan, orderBy);
- }
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
-
- if (plan instanceof HashJoinPlan) {
- HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
- newPlan = HashJoinPlan.create(select, newPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
- }
- return newPlan;
- }
-
- private boolean isServerSortDoable() {
- RelNode rel = CalciteUtils.getBestRel(getInput());
- return rel instanceof PhoenixRel
- && ((PhoenixRel) rel).getPlanType() != PlanType.CLIENT_SERVER;
+ return new OrderBy(orderByExpressions);
}
- private boolean isClientSortMergable() {
- RelNode rel = CalciteUtils.getBestRel(getInput());
- return rel instanceof PhoenixAggregate;
- }
-
- @Override
- public PlanType getPlanType() {
- return PlanType.CLIENT_SERVER;
+ protected Integer getLimit(Implementor implementor) {
+ // TODO
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
index 8b437bc..e86045a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
@@ -16,6 +16,11 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexNode;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.calcite.rules.PhoenixCompactClientSortRule;
+import org.apache.phoenix.calcite.rules.PhoenixServerAggregateRule;
+import org.apache.phoenix.calcite.rules.PhoenixServerJoinRule;
+import org.apache.phoenix.calcite.rules.PhoenixServerProjectRule;
+import org.apache.phoenix.calcite.rules.PhoenixServerSortRule;
import org.apache.phoenix.compile.ColumnResolver;
import org.apache.phoenix.compile.FromCompiler;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
@@ -64,6 +69,17 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel {
planner.addRule(rule);
}
planner.addRule(PhoenixFilterScanMergeRule.INSTANCE);
+ planner.addRule(PhoenixServerProjectRule.PROJECT_SCAN);
+ planner.addRule(PhoenixServerProjectRule.PROJECT_SERVERJOIN);
+ planner.addRule(PhoenixServerJoinRule.JOIN_SCAN);
+ planner.addRule(PhoenixServerJoinRule.JOIN_SERVERPROJECT_SCAN);
+ planner.addRule(PhoenixServerAggregateRule.AGGREGATE_SCAN);
+ planner.addRule(PhoenixServerAggregateRule.AGGREGATE_SERVERJOIN);
+ planner.addRule(PhoenixServerAggregateRule.AGGREGATE_SERVERPROJECT);
+ planner.addRule(PhoenixServerSortRule.SORT_SCAN);
+ planner.addRule(PhoenixServerSortRule.SORT_SERVERJOIN);
+ planner.addRule(PhoenixServerSortRule.SORT_SERVERPROJECT);
+ // TODO planner.addRule(PhoenixCompactClientSortRule.SORT_SERVERAGGREGATE);
}
@Override
@@ -141,9 +157,4 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel {
scan.addFamily(family.getName().getBytes());
}
}
-
- @Override
- public PlanType getPlanType() {
- return PlanType.SERVER_ONLY_FLAT;
- }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixUnion.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixUnion.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixUnion.java
index c824246..cc76334 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixUnion.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixUnion.java
@@ -34,9 +34,4 @@ public class PhoenixUnion extends Union implements PhoenixRel {
}
throw new UnsupportedOperationException();
}
-
- @Override
- public PlanType getPlanType() {
- return PlanType.CLIENT_SERVER;
- }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixValues.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixValues.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixValues.java
index 92bc676..6e01abb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixValues.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixValues.java
@@ -34,9 +34,4 @@ public class PhoenixValues extends Values implements PhoenixRel {
public QueryPlan implement(Implementor implementor) {
throw new UnsupportedOperationException();
}
-
- @Override
- public PlanType getPlanType() {
- return PlanType.SERVER_ONLY_FLAT;
- }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixCompactClientSortRule.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixCompactClientSortRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixCompactClientSortRule.java
new file mode 100644
index 0000000..63cd60e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixCompactClientSortRule.java
@@ -0,0 +1,30 @@
+package org.apache.phoenix.calcite.rules;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.phoenix.calcite.PhoenixClientSort;
+import org.apache.phoenix.calcite.PhoenixCompactClientSort;
+import org.apache.phoenix.calcite.PhoenixRel;
+import org.apache.phoenix.calcite.PhoenixServerAggregate;
+
+public class PhoenixCompactClientSortRule extends RelOptRule {
+
+ public static final PhoenixCompactClientSortRule SORT_SERVERAGGREGATE =
+ new PhoenixCompactClientSortRule("PhoenixCompactClientSortRule:sort_serveraggregate", PhoenixServerAggregate.class);
+
+ public PhoenixCompactClientSortRule(String description, Class<? extends PhoenixRel> clazz) {
+ super(
+ operand(PhoenixClientSort.class,
+ operand(clazz, any())),
+ description);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ PhoenixClientSort sort = call.rel(0);
+ PhoenixRel input = call.rel(1);
+ call.transformTo(new PhoenixCompactClientSort(sort.getCluster(),
+ sort.getTraitSet(), input, sort.getCollation(), sort.offset, sort.fetch));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixServerAggregateRule.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixServerAggregateRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixServerAggregateRule.java
new file mode 100644
index 0000000..969a433
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixServerAggregateRule.java
@@ -0,0 +1,39 @@
+package org.apache.phoenix.calcite.rules;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.phoenix.calcite.PhoenixClientAggregate;
+import org.apache.phoenix.calcite.PhoenixRel;
+import org.apache.phoenix.calcite.PhoenixServerAggregate;
+import org.apache.phoenix.calcite.PhoenixServerJoin;
+import org.apache.phoenix.calcite.PhoenixServerProject;
+import org.apache.phoenix.calcite.PhoenixTableScan;
+
+public class PhoenixServerAggregateRule extends RelOptRule {
+
+ public static final PhoenixServerAggregateRule AGGREGATE_SCAN =
+ new PhoenixServerAggregateRule("PhoenixServerAggregateRule:aggregate_scan", PhoenixTableScan.class);
+
+ public static final PhoenixServerAggregateRule AGGREGATE_SERVERJOIN =
+ new PhoenixServerAggregateRule("PhoenixServerAggregateRule:aggregate_serverjoin", PhoenixServerJoin.class);
+
+ public static final PhoenixServerAggregateRule AGGREGATE_SERVERPROJECT =
+ new PhoenixServerAggregateRule("PhoenixServerAggregateRule:aggregate_serverproject", PhoenixServerProject.class);
+
+ public PhoenixServerAggregateRule(String description, Class<? extends PhoenixRel> clazz) {
+ super(
+ operand(PhoenixClientAggregate.class,
+ operand(clazz, any())),
+ description);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ PhoenixClientAggregate aggregate = call.rel(0);
+ PhoenixRel input = call.rel(1);
+ call.transformTo(new PhoenixServerAggregate(aggregate.getCluster(),
+ aggregate.getTraitSet(), input, aggregate.indicator,
+ aggregate.getGroupSet(), aggregate.getGroupSets(), aggregate.getAggCallList()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixServerJoinRule.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixServerJoinRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixServerJoinRule.java
new file mode 100644
index 0000000..3c15ea9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixServerJoinRule.java
@@ -0,0 +1,54 @@
+package org.apache.phoenix.calcite.rules;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.phoenix.calcite.PhoenixClientJoin;
+import org.apache.phoenix.calcite.PhoenixRel;
+import org.apache.phoenix.calcite.PhoenixServerJoin;
+import org.apache.phoenix.calcite.PhoenixServerProject;
+import org.apache.phoenix.calcite.PhoenixTableScan;
+
+import com.google.common.base.Predicate;
+
+public class PhoenixServerJoinRule extends RelOptRule {
+
+ /** Predicate that returns true if a join type is not right or full. */
+ private static final Predicate<PhoenixClientJoin> NO_RIGHT_OR_FULL =
+ new Predicate<PhoenixClientJoin>() {
+ @Override
+ public boolean apply(PhoenixClientJoin phoenixClientJoin) {
+ return phoenixClientJoin.getJoinType() != JoinRelType.RIGHT
+ && phoenixClientJoin.getJoinType() != JoinRelType.FULL;
+ }
+ };
+
+ public static final PhoenixServerJoinRule JOIN_SCAN =
+ new PhoenixServerJoinRule("PhoenixServerJoinRule:join_scan",
+ operand(PhoenixTableScan.class, any()));
+
+ public static final PhoenixServerJoinRule JOIN_SERVERPROJECT_SCAN =
+ new PhoenixServerJoinRule("PhoenixServerJoinRule:join_serverproject_scan",
+ operand(PhoenixServerProject.class,
+ operand(PhoenixTableScan.class, any())));
+
+ public PhoenixServerJoinRule(String description, RelOptRuleOperand left) {
+ super(
+ operand(PhoenixClientJoin.class, null, NO_RIGHT_OR_FULL,
+ left,
+ operand(PhoenixRel.class, any())),
+ description);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ PhoenixClientJoin join = call.rel(0);
+ PhoenixRel left = call.rel(1);
+ PhoenixRel right = call.rel(call.getRelList().size() - 1);
+ call.transformTo(new PhoenixServerJoin(join.getCluster(),
+ join.getTraitSet(), left, right, join.getCondition(),
+ join.getJoinType(), join.getVariablesStopped()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixServerProjectRule.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixServerProjectRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixServerProjectRule.java
new file mode 100644
index 0000000..05bd332
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixServerProjectRule.java
@@ -0,0 +1,34 @@
+package org.apache.phoenix.calcite.rules;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.phoenix.calcite.PhoenixClientProject;
+import org.apache.phoenix.calcite.PhoenixRel;
+import org.apache.phoenix.calcite.PhoenixServerJoin;
+import org.apache.phoenix.calcite.PhoenixServerProject;
+import org.apache.phoenix.calcite.PhoenixTableScan;
+
+public class PhoenixServerProjectRule extends RelOptRule {
+
+ public static final PhoenixServerProjectRule PROJECT_SCAN =
+ new PhoenixServerProjectRule("PhoenixServerProjectRule:project_scan", PhoenixTableScan.class);
+
+ public static final PhoenixServerProjectRule PROJECT_SERVERJOIN =
+ new PhoenixServerProjectRule("PhoenixServerProjectRule:project_serverjoin", PhoenixServerJoin.class);
+
+ public PhoenixServerProjectRule(String description, Class<? extends PhoenixRel> clazz) {
+ super(
+ operand(PhoenixClientProject.class,
+ operand(clazz, any())),
+ description);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ PhoenixClientProject project = call.rel(0);
+ PhoenixRel input = call.rel(1);
+ call.transformTo(new PhoenixServerProject(project.getCluster(),
+ project.getTraitSet(), input, project.getProjects(), project.getRowType()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixServerSortRule.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixServerSortRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixServerSortRule.java
new file mode 100644
index 0000000..0bd517f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixServerSortRule.java
@@ -0,0 +1,38 @@
+package org.apache.phoenix.calcite.rules;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.phoenix.calcite.PhoenixClientSort;
+import org.apache.phoenix.calcite.PhoenixRel;
+import org.apache.phoenix.calcite.PhoenixServerJoin;
+import org.apache.phoenix.calcite.PhoenixServerProject;
+import org.apache.phoenix.calcite.PhoenixServerSort;
+import org.apache.phoenix.calcite.PhoenixTableScan;
+
+public class PhoenixServerSortRule extends RelOptRule {
+
+ public static final PhoenixServerSortRule SORT_SCAN =
+ new PhoenixServerSortRule("PhoenixServerSortRule:sort_scan", PhoenixTableScan.class);
+
+ public static final PhoenixServerSortRule SORT_SERVERJOIN =
+ new PhoenixServerSortRule("PhoenixServerSortRule:sort_serverjoin", PhoenixServerJoin.class);
+
+ public static final PhoenixServerSortRule SORT_SERVERPROJECT =
+ new PhoenixServerSortRule("PhoenixServerSortRule:sort_serverproject", PhoenixServerProject.class);
+
+ public PhoenixServerSortRule(String description, Class<? extends PhoenixRel> clazz) {
+ super(
+ operand(PhoenixClientSort.class,
+ operand(clazz, any())),
+ description);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ PhoenixClientSort sort = call.rel(0);
+ PhoenixRel input = call.rel(1);
+ call.transformTo(new PhoenixServerSort(sort.getCluster(),
+ sort.getTraitSet(), input, sort.getCollation(), sort.offset, sort.fetch));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 241814c..db16d11 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -72,8 +72,8 @@ public class AggregatePlan extends BaseQueryPlan {
private List<KeyRange> splits;
private List<List<Scan>> scans;
- public static AggregatePlan create(AggregatePlan plan, OrderBy newOrderBy) {
- return new AggregatePlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), plan.getProjector(), plan.getLimit(), newOrderBy, plan.parallelIteratorFactory, plan.getGroupBy(), plan.getHaving());
+ public static AggregatePlan create(AggregatePlan plan, OrderBy newOrderBy, Integer newLimit) {
+ return new AggregatePlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), plan.getProjector(), newLimit, newOrderBy, plan.parallelIteratorFactory, plan.getGroupBy(), plan.getHaving());
}
public AggregatePlan(
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 0dfbcbf..021da04 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -73,8 +73,8 @@ public class ScanPlan extends BaseQueryPlan {
private List<List<Scan>> scans;
private boolean allowPageFilter;
- public static ScanPlan create(ScanPlan plan, OrderBy newOrderBy) throws SQLException {
- return new ScanPlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), plan.getProjector(), plan.getLimit(), newOrderBy, plan.parallelIteratorFactory, plan.allowPageFilter);
+ public static ScanPlan create(ScanPlan plan, OrderBy newOrderBy, Integer newLimit) throws SQLException {
+ return new ScanPlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), plan.getProjector(), newLimit, newOrderBy, plan.parallelIteratorFactory, plan.allowPageFilter);
}
public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) throws SQLException {
[2/2] phoenix git commit: PHOENIX-1845 Implement two sets (server and
client) of Phoenix conventions
Posted by ma...@apache.org.
PHOENIX-1845 Implement two sets (server and client) of Phoenix conventions
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8cde1c54
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8cde1c54
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8cde1c54
Branch: refs/heads/calcite
Commit: 8cde1c54ec4aafd0b16bf2fd2eccec6704e51909
Parents: ae0c234
Author: maryannxue <we...@intel.com>
Authored: Mon Apr 13 21:36:35 2015 -0400
Committer: maryannxue <we...@intel.com>
Committed: Mon Apr 13 21:36:35 2015 -0400
----------------------------------------------------------------------
.../org/apache/phoenix/calcite/CalciteTest.java | 122 +++++++++---------
.../apache/phoenix/calcite/CalciteUtils.java | 10 --
.../phoenix/calcite/PhoenixAggregate.java | 114 +++--------------
.../phoenix/calcite/PhoenixClientAggregate.java | 69 +++++++++++
.../phoenix/calcite/PhoenixClientJoin.java | 55 ++++++++
.../phoenix/calcite/PhoenixClientProject.java | 45 +++++++
.../phoenix/calcite/PhoenixClientSort.java | 65 ++++++++++
.../calcite/PhoenixCompactClientSort.java | 66 ++++++++++
.../apache/phoenix/calcite/PhoenixFilter.java | 5 -
.../org/apache/phoenix/calcite/PhoenixJoin.java | 119 +-----------------
.../apache/phoenix/calcite/PhoenixProject.java | 45 +------
.../org/apache/phoenix/calcite/PhoenixRel.java | 8 --
.../apache/phoenix/calcite/PhoenixRules.java | 33 ++---
.../phoenix/calcite/PhoenixServerAggregate.java | 73 +++++++++++
.../phoenix/calcite/PhoenixServerJoin.java | 124 +++++++++++++++++++
.../phoenix/calcite/PhoenixServerProject.java | 51 ++++++++
.../phoenix/calcite/PhoenixServerSort.java | 73 +++++++++++
.../org/apache/phoenix/calcite/PhoenixSort.java | 110 ++--------------
.../phoenix/calcite/PhoenixTableScan.java | 21 +++-
.../apache/phoenix/calcite/PhoenixUnion.java | 5 -
.../apache/phoenix/calcite/PhoenixValues.java | 5 -
.../rules/PhoenixCompactClientSortRule.java | 30 +++++
.../rules/PhoenixServerAggregateRule.java | 39 ++++++
.../calcite/rules/PhoenixServerJoinRule.java | 54 ++++++++
.../calcite/rules/PhoenixServerProjectRule.java | 34 +++++
.../calcite/rules/PhoenixServerSortRule.java | 38 ++++++
.../apache/phoenix/execute/AggregatePlan.java | 4 +-
.../org/apache/phoenix/execute/ScanPlan.java | 4 +-
28 files changed, 943 insertions(+), 478 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
index 3f39768..59858e3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
@@ -201,7 +201,7 @@ public class CalciteTest extends BaseClientManagedTimeIT {
@Test public void testProject() throws Exception {
start().sql("select entity_id, a_string, organization_id from aTable where a_string = 'a'")
.explainIs("PhoenixToEnumerableConverter\n" +
- " PhoenixProject(ENTITY_ID=[$1], A_STRING=[$2], ORGANIZATION_ID=[$0])\n" +
+ " PhoenixServerProject(ENTITY_ID=[$1], A_STRING=[$2], ORGANIZATION_ID=[$0])\n" +
" PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2, 'a')])\n")
.resultIs(new Object[][] {
{"00A123122312312", "a", "00D300000000XHP"},
@@ -214,11 +214,11 @@ public class CalciteTest extends BaseClientManagedTimeIT {
@Test public void testJoin() throws Exception {
start().sql("select t1.entity_id, t2.a_string, t1.organization_id from aTable t1 join aTable t2 on t1.entity_id = t2.entity_id and t1.organization_id = t2.organization_id where t1.a_string = 'a'")
.explainIs("PhoenixToEnumerableConverter\n" +
- " PhoenixProject(ENTITY_ID=[$4], A_STRING=[$2], ORGANIZATION_ID=[$3])\n" +
- " PhoenixJoin(condition=[AND(=($4, $1), =($3, $0))], joinType=[inner])\n" +
- " PhoenixProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[$2])\n" +
+ " PhoenixServerProject(ENTITY_ID=[$4], A_STRING=[$2], ORGANIZATION_ID=[$3])\n" +
+ " PhoenixServerJoin(condition=[AND(=($1, $4), =($0, $3))], joinType=[inner])\n" +
+ " PhoenixServerProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[$2])\n" +
" PhoenixTableScan(table=[[phoenix, ATABLE]])\n" +
- " PhoenixProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[$2])\n" +
+ " PhoenixServerProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[$2])\n" +
" PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2, 'a')])\n")
.resultIs(new Object[][] {
{"00A123122312312", "a", "00D300000000XHP"},
@@ -229,11 +229,11 @@ public class CalciteTest extends BaseClientManagedTimeIT {
start().sql("SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\"")
.explainIs("PhoenixToEnumerableConverter\n" +
- " PhoenixProject(item_id=[$0], NAME=[$1], supplier_id=[$3], NAME0=[$4])\n" +
- " PhoenixJoin(condition=[=($2, $3)], joinType=[inner])\n" +
- " PhoenixProject(item_id=[$0], NAME=[$1], supplier_id=[$5])\n" +
+ " PhoenixServerProject(item_id=[$0], NAME=[$1], supplier_id=[$3], NAME0=[$4])\n" +
+ " PhoenixServerJoin(condition=[=($2, $3)], joinType=[inner])\n" +
+ " PhoenixServerProject(item_id=[$0], NAME=[$1], supplier_id=[$5])\n" +
" PhoenixTableScan(table=[[phoenix, ITEMTABLE]])\n" +
- " PhoenixProject(supplier_id=[$0], NAME=[$1])\n" +
+ " PhoenixServerProject(supplier_id=[$0], NAME=[$1])\n" +
" PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]])\n")
.resultIs(new Object[][] {
{"0000000001", "T1", "0000000001", "S1"},
@@ -246,10 +246,10 @@ public class CalciteTest extends BaseClientManagedTimeIT {
start().sql("SELECT * FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" AND supp.name = 'S5'")
.explainIs("PhoenixToEnumerableConverter\n" +
- " PhoenixProject(item_id=[$0], NAME=[$1], PRICE=[$2], DISCOUNT1=[$3], DISCOUNT2=[$4], supplier_id=[$5], DESCRIPTION=[$6], supplier_id0=[$7], NAME0=[$8], PHONE=[$9], ADDRESS=[$10], LOC_ID=[$11])\n" +
- " PhoenixJoin(condition=[=($5, $7)], joinType=[inner])\n" +
+ " PhoenixServerProject(item_id=[$0], NAME=[$1], PRICE=[$2], DISCOUNT1=[$3], DISCOUNT2=[$4], supplier_id=[$5], DESCRIPTION=[$6], supplier_id0=[$7], NAME0=[$8], PHONE=[$9], ADDRESS=[$10], LOC_ID=[$11])\n" +
+ " PhoenixServerJoin(condition=[=($5, $7)], joinType=[inner])\n" +
" PhoenixTableScan(table=[[phoenix, ITEMTABLE]])\n" +
- " PhoenixProject(supplier_id=[$0], NAME=[$1], PHONE=[$2], ADDRESS=[$3], LOC_ID=[$4], $f5=[CAST($1):VARCHAR(2) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL])\n" +
+ " PhoenixServerProject(supplier_id=[$0], NAME=[$1], PHONE=[$2], ADDRESS=[$3], LOC_ID=[$4], $f5=[CAST($1):VARCHAR(2) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL])\n" +
" PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]], filter=[=(CAST($1):VARCHAR(2) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL, 'S5')])\n")
.resultIs(new Object[][] {
{"0000000005", "T5", 500, 8, 15, "0000000005", "Item T5", "0000000005", "S5", "888-888-5555", "505 YYY Street", "10005"}})
@@ -259,11 +259,11 @@ public class CalciteTest extends BaseClientManagedTimeIT {
@Test public void testMultiJoin() throws Exception {
start().sql("select t1.entity_id, t2.a_string, t3.organization_id from aTable t1 join aTable t2 on t1.entity_id = t2.entity_id and t1.organization_id = t2.organization_id join atable t3 on t1.entity_id = t3.entity_id and t1.organization_id = t3.organization_id where t1.a_string = 'a'")
.explainIs("PhoenixToEnumerableConverter\n" +
- " PhoenixProject(ENTITY_ID=[$19], A_STRING=[$2], ORGANIZATION_ID=[$36])\n" +
- " PhoenixJoin(condition=[AND(=($19, $1), =($18, $0))], joinType=[inner])\n" +
+ " PhoenixServerProject(ENTITY_ID=[$19], A_STRING=[$38], ORGANIZATION_ID=[$0])\n" +
+ " PhoenixServerJoin(condition=[AND(=($19, $1), =($18, $0))], joinType=[inner])\n" +
" PhoenixTableScan(table=[[phoenix, ATABLE]])\n" +
- " PhoenixProject(ORGANIZATION_ID=[$18], ENTITY_ID=[$19], A_STRING=[$20], B_STRING=[$21], A_INTEGER=[$22], A_DATE=[$23], A_TIME=[$24], A_TIMESTAMP=[$25], X_DECIMAL=[$26], X_LONG=[$27], X_INTEGER=[$28], Y_INTEGER=[$29], A_BYTE=[$30], A_SHORT=[$31], A_FLOAT=[$32], A_DOUBLE=[$33], A_UNSIGNED_FLOAT=[$34], A_UNSIGNED_DOUBLE=[$35], ORGANIZATION_ID0=[$0], ENTITY_ID0=[$1], A_STRING0=[$2], B_STRING0=[$3], A_INTEGER0=[$4], A_DATE0=[$5], A_TIME0=[$6], A_TIMESTAMP0=[$7], X_DECIMAL0=[$8], X_LONG0=[$9], X_INTEGER0=[$10], Y_INTEGER0=[$11], A_BYTE0=[$12], A_SHORT0=[$13], A_FLOAT0=[$14], A_DOUBLE0=[$15], A_UNSIGNED_FLOAT0=[$16], A_UNSIGNED_DOUBLE0=[$17])\n" +
- " PhoenixJoin(condition=[AND(=($19, $1), =($18, $0))], joinType=[inner])\n" +
+ " PhoenixServerProject(ORGANIZATION_ID=[$18], ENTITY_ID=[$19], A_STRING=[$20], B_STRING=[$21], A_INTEGER=[$22], A_DATE=[$23], A_TIME=[$24], A_TIMESTAMP=[$25], X_DECIMAL=[$26], X_LONG=[$27], X_INTEGER=[$28], Y_INTEGER=[$29], A_BYTE=[$30], A_SHORT=[$31], A_FLOAT=[$32], A_DOUBLE=[$33], A_UNSIGNED_FLOAT=[$34], A_UNSIGNED_DOUBLE=[$35], ORGANIZATION_ID0=[$0], ENTITY_ID0=[$1], A_STRING0=[$2], B_STRING0=[$3], A_INTEGER0=[$4], A_DATE0=[$5], A_TIME0=[$6], A_TIMESTAMP0=[$7], X_DECIMAL0=[$8], X_LONG0=[$9], X_INTEGER0=[$10], Y_INTEGER0=[$11], A_BYTE0=[$12], A_SHORT0=[$13], A_FLOAT0=[$14], A_DOUBLE0=[$15], A_UNSIGNED_FLOAT0=[$16], A_UNSIGNED_DOUBLE0=[$17])\n" +
+ " PhoenixServerJoin(condition=[AND(=($1, $19), =($0, $18))], joinType=[inner])\n" +
" PhoenixTableScan(table=[[phoenix, ATABLE]])\n" +
" PhoenixTableScan(table=[[phoenix, ATABLE]], filter=[=($2, 'a')])\n")
.resultIs(new Object[][] {
@@ -275,10 +275,10 @@ public class CalciteTest extends BaseClientManagedTimeIT {
start().sql("select t1.entity_id, t2.a_string, t3.organization_id from aTable t1 join aTable t2 on t1.entity_id = t2.entity_id and t1.organization_id = t2.organization_id join atable t3 on t1.entity_id = t3.entity_id and t1.organization_id = t3.organization_id")
.explainIs("PhoenixToEnumerableConverter\n" +
- " PhoenixProject(ENTITY_ID=[$19], A_STRING=[$2], ORGANIZATION_ID=[$36])\n" +
- " PhoenixJoin(condition=[AND(=($19, $1), =($18, $0))], joinType=[inner])\n" +
+ " PhoenixServerProject(ENTITY_ID=[$19], A_STRING=[$38], ORGANIZATION_ID=[$0])\n" +
+ " PhoenixServerJoin(condition=[AND(=($1, $19), =($0, $18))], joinType=[inner])\n" +
" PhoenixTableScan(table=[[phoenix, ATABLE]])\n" +
- " PhoenixJoin(condition=[AND(=($1, $19), =($0, $18))], joinType=[inner])\n" +
+ " PhoenixServerJoin(condition=[AND(=($1, $19), =($0, $18))], joinType=[inner])\n" +
" PhoenixTableScan(table=[[phoenix, ATABLE]])\n" +
" PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
.resultIs(new Object[][] {
@@ -297,8 +297,8 @@ public class CalciteTest extends BaseClientManagedTimeIT {
@Test public void testAggregate() {
start().sql("select a_string, count(entity_id) from atable group by a_string")
.explainIs("PhoenixToEnumerableConverter\n" +
- " PhoenixAggregate(group=[{0}], EXPR$1=[COUNT()])\n" +
- " PhoenixProject(A_STRING=[$2])\n" +
+ " PhoenixServerAggregate(group=[{0}], EXPR$1=[COUNT()])\n" +
+ " PhoenixServerProject(A_STRING=[$2])\n" +
" PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
.resultIs(new Object[][] {
{"a", 4L},
@@ -308,9 +308,9 @@ public class CalciteTest extends BaseClientManagedTimeIT {
start().sql("select count(entity_id), a_string from atable group by a_string")
.explainIs("PhoenixToEnumerableConverter\n" +
- " PhoenixProject(EXPR$0=[$1], A_STRING=[$0])\n" +
- " PhoenixAggregate(group=[{0}], EXPR$0=[COUNT()])\n" +
- " PhoenixProject(A_STRING=[$2])\n" +
+ " PhoenixClientProject(EXPR$0=[$1], A_STRING=[$0])\n" +
+ " PhoenixServerAggregate(group=[{0}], EXPR$0=[COUNT()])\n" +
+ " PhoenixServerProject(A_STRING=[$2])\n" +
" PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
.resultIs(new Object[][] {
{4L, "a"},
@@ -320,12 +320,12 @@ public class CalciteTest extends BaseClientManagedTimeIT {
start().sql("select s.name, count(\"item_id\") from " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s join " + JOIN_ITEM_TABLE_FULL_NAME + " i on s.\"supplier_id\" = i.\"supplier_id\" group by s.name")
.explainIs("PhoenixToEnumerableConverter\n" +
- " PhoenixAggregate(group=[{0}], EXPR$1=[COUNT()])\n" +
- " PhoenixProject(NAME=[$1])\n" +
- " PhoenixJoin(condition=[=($0, $2)], joinType=[inner])\n" +
- " PhoenixProject(supplier_id=[$0], NAME=[$1])\n" +
+ " PhoenixServerAggregate(group=[{0}], EXPR$1=[COUNT()])\n" +
+ " PhoenixServerProject(NAME=[$1])\n" +
+ " PhoenixServerJoin(condition=[=($0, $2)], joinType=[inner])\n" +
+ " PhoenixServerProject(supplier_id=[$0], NAME=[$1])\n" +
" PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]])\n" +
- " PhoenixProject(supplier_id=[$5])\n" +
+ " PhoenixServerProject(supplier_id=[$5])\n" +
" PhoenixTableScan(table=[[phoenix, ITEMTABLE]])\n")
.resultIs(new Object[][] {
{"S1", 2L},
@@ -338,8 +338,8 @@ public class CalciteTest extends BaseClientManagedTimeIT {
@Test public void testDistinct() {
start().sql("select distinct a_string from aTable")
.explainIs("PhoenixToEnumerableConverter\n" +
- " PhoenixAggregate(group=[{0}])\n" +
- " PhoenixProject(A_STRING=[$2])\n" +
+ " PhoenixServerAggregate(group=[{0}])\n" +
+ " PhoenixServerProject(A_STRING=[$2])\n" +
" PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
.resultIs(new Object[][]{
{"a"},
@@ -351,8 +351,8 @@ public class CalciteTest extends BaseClientManagedTimeIT {
@Test public void testSort() {
start().sql("select organization_id, entity_id, a_string from aTable order by a_string, entity_id")
.explainIs("PhoenixToEnumerableConverter\n" +
- " PhoenixSort(sort0=[$2], sort1=[$1], dir0=[ASC], dir1=[ASC])\n" +
- " PhoenixProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[$2])\n" +
+ " PhoenixServerSort(sort0=[$2], sort1=[$1], dir0=[ASC], dir1=[ASC])\n" +
+ " PhoenixServerProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[$2])\n" +
" PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
.resultIs(new Object[][] {
{"00D300000000XHP", "00A123122312312", "a"},
@@ -368,8 +368,8 @@ public class CalciteTest extends BaseClientManagedTimeIT {
start().sql("select organization_id, entity_id, a_string from aTable order by organization_id, entity_id")
.explainIs("PhoenixToEnumerableConverter\n" +
- " PhoenixSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC])\n" +
- " PhoenixProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[$2])\n" +
+ " PhoenixServerSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC])\n" +
+ " PhoenixServerProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[$2])\n" +
" PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
.resultIs(new Object[][] {
{"00D300000000XHP", "00A123122312312", "a"},
@@ -385,10 +385,10 @@ public class CalciteTest extends BaseClientManagedTimeIT {
start().sql("select count(entity_id), a_string from atable group by a_string order by count(entity_id), a_string desc")
.explainIs("PhoenixToEnumerableConverter\n" +
- " PhoenixProject(EXPR$0=[$1], A_STRING=[$0])\n" +
- " PhoenixSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[DESC])\n" +
- " PhoenixAggregate(group=[{0}], EXPR$0=[COUNT()])\n" +
- " PhoenixProject(A_STRING=[$2])\n" +
+ " PhoenixClientSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[DESC])\n" +
+ " PhoenixClientProject(EXPR$0=[$1], A_STRING=[$0])\n" +
+ " PhoenixServerAggregate(group=[{0}], EXPR$0=[COUNT()])\n" +
+ " PhoenixServerProject(A_STRING=[$2])\n" +
" PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
.resultIs(new Object[][] {
{1L, "c"},
@@ -398,14 +398,14 @@ public class CalciteTest extends BaseClientManagedTimeIT {
start().sql("select s.name, count(\"item_id\") from " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s join " + JOIN_ITEM_TABLE_FULL_NAME + " i on s.\"supplier_id\" = i.\"supplier_id\" group by s.name order by count(\"item_id\"), s.name desc")
.explainIs("PhoenixToEnumerableConverter\n" +
- " PhoenixSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[DESC])\n" +
- " PhoenixAggregate(group=[{0}], EXPR$1=[COUNT()])\n" +
- " PhoenixProject(NAME=[$1])\n" +
- " PhoenixJoin(condition=[=($0, $2)], joinType=[inner])\n" +
- " PhoenixProject(supplier_id=[$0], NAME=[$1])\n" +
- " PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]])\n" +
- " PhoenixProject(supplier_id=[$5])\n" +
- " PhoenixTableScan(table=[[phoenix, ITEMTABLE]])\n")
+ " PhoenixClientSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[DESC])\n" +
+ " PhoenixServerAggregate(group=[{0}], EXPR$1=[COUNT()])\n" +
+ " PhoenixServerProject(NAME=[$2])\n" +
+ " PhoenixServerJoin(condition=[=($1, $0)], joinType=[inner])\n" +
+ " PhoenixServerProject(supplier_id=[$5])\n" +
+ " PhoenixTableScan(table=[[phoenix, ITEMTABLE]])\n" +
+ " PhoenixServerProject(supplier_id=[$0], NAME=[$1])\n" +
+ " PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]])\n")
.resultIs(new Object[][] {
{"S6", 1L},
{"S5", 1L},
@@ -415,13 +415,13 @@ public class CalciteTest extends BaseClientManagedTimeIT {
start().sql("SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" order by item.name desc")
.explainIs("PhoenixToEnumerableConverter\n" +
- " PhoenixSort(sort0=[$1], dir0=[DESC])\n" +
- " PhoenixProject(item_id=[$0], NAME=[$1], supplier_id=[$3], NAME0=[$4])\n" +
- " PhoenixJoin(condition=[=($2, $3)], joinType=[inner])\n" +
- " PhoenixProject(item_id=[$0], NAME=[$1], supplier_id=[$5])\n" +
- " PhoenixTableScan(table=[[phoenix, ITEMTABLE]])\n" +
- " PhoenixProject(supplier_id=[$0], NAME=[$1])\n" +
- " PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]])\n")
+ " PhoenixServerSort(sort0=[$1], dir0=[DESC])\n" +
+ " PhoenixServerProject(item_id=[$2], NAME=[$3], supplier_id=[$0], NAME0=[$1])\n" +
+ " PhoenixServerJoin(condition=[=($4, $0)], joinType=[inner])\n" +
+ " PhoenixServerProject(supplier_id=[$0], NAME=[$1])\n" +
+ " PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]])\n" +
+ " PhoenixServerProject(item_id=[$0], NAME=[$1], supplier_id=[$5])\n" +
+ " PhoenixTableScan(table=[[phoenix, ITEMTABLE]])\n")
.resultIs(new Object[][] {
{"0000000006", "T6", "0000000006", "S6"},
{"0000000005", "T5", "0000000005", "S5"},
@@ -435,15 +435,15 @@ public class CalciteTest extends BaseClientManagedTimeIT {
@Test public void testSubquery() {
start().sql("SELECT \"order_id\", quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE quantity = (SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = q.\"item_id\")")
.explainIs("PhoenixToEnumerableConverter\n" +
- " PhoenixProject(order_id=[$0], QUANTITY=[$4])\n" +
- " PhoenixJoin(condition=[AND(=($2, $6), =($4, $7))], joinType=[inner])\n" +
+ " PhoenixServerProject(order_id=[$0], QUANTITY=[$4])\n" +
+ " PhoenixServerJoin(condition=[AND(=($2, $6), =($4, $7))], joinType=[inner])\n" +
" PhoenixTableScan(table=[[phoenix, ORDERTABLE]])\n" +
- " PhoenixAggregate(group=[{0}], EXPR$0=[MAX($1)])\n" +
- " PhoenixProject(item_id0=[$6], QUANTITY=[$4])\n" +
- " PhoenixJoin(condition=[=($6, $2)], joinType=[inner])\n" +
+ " PhoenixServerAggregate(group=[{0}], EXPR$0=[MAX($1)])\n" +
+ " PhoenixServerProject(item_id0=[$6], QUANTITY=[$4])\n" +
+ " PhoenixServerJoin(condition=[=($6, $2)], joinType=[inner])\n" +
" PhoenixTableScan(table=[[phoenix, ORDERTABLE]])\n" +
- " PhoenixAggregate(group=[{0}])\n" +
- " PhoenixProject(item_id=[$2])\n" +
+ " PhoenixServerAggregate(group=[{0}])\n" +
+ " PhoenixServerProject(item_id=[$2])\n" +
" PhoenixTableScan(table=[[phoenix, ORDERTABLE]])\n")
.resultIs(new Object[][]{
{"000000000000001", 1000},
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
index d2a4a31..4110b5e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
@@ -5,8 +5,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.calcite.plan.volcano.RelSubset;
-import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
@@ -17,7 +15,6 @@ import org.apache.calcite.sql.SqlKind;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.calcite.PhoenixRel.Implementor;
-import org.apache.phoenix.calcite.PhoenixRel.PlanType;
import org.apache.phoenix.expression.ComparisonExpression;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.ExpressionType;
@@ -42,13 +39,6 @@ public class CalciteUtils {
public static String createTempAlias() {
return "$" + tempAliasCounter.incrementAndGet();
}
-
- public static RelNode getBestRel(RelNode rel) {
- if (rel instanceof RelSubset)
- return ((RelSubset) rel).getBest();
-
- return rel;
- }
private static final Map<SqlKind, ExpressionFactory> EXPRESSION_MAP = Maps
.newHashMapWithExpectedSize(ExpressionType.values().length);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java
index ca3a34c..c120be0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java
@@ -1,30 +1,17 @@
package org.apache.phoenix.calcite;
-import java.sql.SQLException;
import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.InvalidRelException;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.phoenix.compile.FromCompiler;
import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.compile.RowProjector;
-import org.apache.phoenix.compile.SequenceManager;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
-import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.execute.AggregatePlan;
-import org.apache.phoenix.execute.ClientAggregatePlan;
-import org.apache.phoenix.execute.HashJoinPlan;
-import org.apache.phoenix.execute.ScanPlan;
import org.apache.phoenix.execute.TupleProjectionPlan;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
@@ -33,8 +20,6 @@ import org.apache.phoenix.expression.aggregator.ClientAggregators;
import org.apache.phoenix.expression.aggregator.ServerAggregators;
import org.apache.phoenix.expression.function.AggregateFunction;
import org.apache.phoenix.expression.function.SingleAggregateFunction;
-import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.RowKeyValueAccessor;
import org.apache.phoenix.schema.TableRef;
@@ -45,76 +30,30 @@ import com.google.common.collect.Lists;
* Implementation of {@link org.apache.calcite.rel.core.Aggregate}
* relational expression in Phoenix.
*/
-public class PhoenixAggregate extends Aggregate implements PhoenixRel {
+abstract public class PhoenixAggregate extends Aggregate implements PhoenixRel {
- public PhoenixAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode child, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) throws InvalidRelException {
+ protected PhoenixAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode child, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls);
assert getConvention() == PhoenixRel.CONVENTION;
for (AggregateCall aggCall : aggCalls) {
if (aggCall.isDistinct()) {
- throw new InvalidRelException( "distinct aggregation not supported");
+ throw new UnsupportedOperationException( "distinct aggregation not supported");
}
}
switch (getGroupType()) {
case SIMPLE:
break;
default:
- throw new InvalidRelException("unsupported group type: " + getGroupType());
+ throw new UnsupportedOperationException("unsupported group type: " + getGroupType());
}
}
- @Override
- public RelOptCost computeSelfCost(RelOptPlanner planner) {
- RelOptCost cost = super.computeSelfCost(planner);
- if (isServerAggregateDoable()) {
- cost = cost.multiplyBy(SERVER_FACTOR);
- }
- return cost.multiplyBy(PHOENIX_FACTOR);
- }
-
- @Override
- public PhoenixAggregate copy(RelTraitSet traits, RelNode input, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggregateCalls) {
- try {
- return new PhoenixAggregate(getCluster(), traits, input, indicator, groupSet, groupSets, aggregateCalls);
- } catch (InvalidRelException e) {
- // Semantic error not possible. Must be a bug. Convert to
- // internal error.
- throw new AssertionError(e);
- }
- }
-
- @Override
- public QueryPlan implement(Implementor implementor) {
- assert getConvention() == getInput().getConvention();
+ protected GroupBy getGroupBy(Implementor implementor) {
if (groupSets.size() > 1) {
throw new UnsupportedOperationException();
}
- QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
- TableRef tableRef = implementor.getTableRef();
- ScanPlan basePlan = null;
- if (plan instanceof ScanPlan) {
- basePlan = (ScanPlan) plan;
- } else if (plan instanceof HashJoinPlan) {
- QueryPlan delegate = ((HashJoinPlan) plan).getDelegate();
- if (delegate instanceof ScanPlan) {
- basePlan = (ScanPlan) delegate;
- }
- }
- // We can not merge with the base plan that has a limit already.
- // But if there is order-by without a limit, we can simply ignore the order-by.
- if (plan.getLimit() != null) {
- basePlan = null;
- }
- PhoenixStatement stmt = plan.getContext().getStatement();
- StatementContext context;
- try {
- context = basePlan == null ? new StatementContext(stmt, FromCompiler.getResolver(tableRef), new Scan(), new SequenceManager(stmt)) : basePlan.getContext();
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
-
List<Integer> ordinals = groupSet.asList();
// TODO check order-preserving
String groupExprAttribName = BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS;
@@ -126,8 +65,11 @@ public class PhoenixAggregate extends Aggregate implements PhoenixRel {
Expression expr = implementor.newColumnExpression(ordinals.get(i));
keyExprs.add(expr);
}
- GroupBy groupBy = new GroupBy.GroupByBuilder().setScanAttribName(groupExprAttribName).setExpressions(keyExprs).setKeyExpressions(keyExprs).build();
+ return new GroupBy.GroupByBuilder().setScanAttribName(groupExprAttribName).setExpressions(keyExprs).setKeyExpressions(keyExprs).build();
+ }
+
+ protected void serializeAggregators(Implementor implementor, StatementContext context, boolean isEmptyGroupBy) {
// TODO sort aggFuncs. same problem with group by key sorting.
List<SingleAggregateFunction> aggFuncs = Lists.newArrayList();
for (AggregateCall call : aggCalls) {
@@ -137,37 +79,28 @@ public class PhoenixAggregate extends Aggregate implements PhoenixRel {
}
aggFuncs.add((SingleAggregateFunction) aggFunc);
}
- int minNullableIndex = getMinNullableIndex(aggFuncs,groupBy.isEmpty());
+ int minNullableIndex = getMinNullableIndex(aggFuncs, isEmptyGroupBy);
context.getScan().setAttribute(BaseScannerRegionObserver.AGGREGATORS, ServerAggregators.serialize(aggFuncs, minNullableIndex));
ClientAggregators clientAggregators = new ClientAggregators(aggFuncs, minNullableIndex);
context.getAggregationManager().setAggregators(clientAggregators);
-
- SelectStatement select = SelectStatement.SELECT_STAR;
- QueryPlan aggPlan;
- if (basePlan == null) {
- aggPlan = new ClientAggregatePlan(context, select, tableRef, RowProjector.EMPTY_PROJECTOR, null, null, OrderBy.EMPTY_ORDER_BY, groupBy, null, plan);
- } else {
- aggPlan = new AggregatePlan(context, select, basePlan.getTableRef(), RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null, groupBy, null);
- if (plan instanceof HashJoinPlan) {
- HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
- aggPlan = HashJoinPlan.create(select, aggPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
- }
- }
-
+ }
+
+ protected static QueryPlan wrapWithProject(Implementor implementor, QueryPlan plan, List<Expression> keyExpressions, List<SingleAggregateFunction> aggFuncs) {
List<Expression> exprs = Lists.newArrayList();
- for (int i = 0; i < keyExprs.size(); i++) {
- Expression keyExpr = keyExprs.get(i);
- RowKeyValueAccessor accessor = new RowKeyValueAccessor(keyExprs, i);
+ for (int i = 0; i < keyExpressions.size(); i++) {
+ Expression keyExpr = keyExpressions.get(i);
+ RowKeyValueAccessor accessor = new RowKeyValueAccessor(keyExpressions, i);
Expression expr = new RowKeyColumnExpression(keyExpr, accessor, keyExpr.getDataType());
exprs.add(expr);
}
for (SingleAggregateFunction aggFunc : aggFuncs) {
exprs.add(aggFunc);
}
+
TupleProjector tupleProjector = implementor.project(exprs);
PTable projectedTable = implementor.createProjectedTable();
implementor.setTableRef(new TableRef(projectedTable));
- return new TupleProjectionPlan(aggPlan, tupleProjector, null);
+ return new TupleProjectionPlan(plan, tupleProjector, null);
}
private static int getMinNullableIndex(List<SingleAggregateFunction> aggFuncs, boolean isUngroupedAggregation) {
@@ -182,15 +115,4 @@ public class PhoenixAggregate extends Aggregate implements PhoenixRel {
return minNullableIndex;
}
- private boolean isServerAggregateDoable() {
- RelNode rel = CalciteUtils.getBestRel(getInput());
- return rel instanceof PhoenixRel
- && ((PhoenixRel) rel).getPlanType() != PlanType.CLIENT_SERVER;
- }
-
- @Override
- public PlanType getPlanType() {
- return PlanType.CLIENT_SERVER;
- }
-
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixClientAggregate.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixClientAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixClientAggregate.java
new file mode 100644
index 0000000..b32f419
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixClientAggregate.java
@@ -0,0 +1,69 @@
+package org.apache.phoenix.calcite;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.FromCompiler;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.SequenceManager;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.ClientAggregatePlan;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.schema.TableRef;
+
+public class PhoenixClientAggregate extends PhoenixAggregate {
+
+ public PhoenixClientAggregate(RelOptCluster cluster, RelTraitSet traits,
+ RelNode child, boolean indicator, ImmutableBitSet groupSet,
+ List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+ super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls);
+ }
+
+ @Override
+ public PhoenixClientAggregate copy(RelTraitSet traits, RelNode input,
+ boolean indicator, ImmutableBitSet groupSet,
+ List<ImmutableBitSet> groupSets, List<AggregateCall> aggregateCalls) {
+ return new PhoenixClientAggregate(getCluster(), traits, input, indicator, groupSet, groupSets, aggregateCalls);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return super.computeSelfCost(planner)
+ .multiplyBy(PHOENIX_FACTOR);
+ }
+
+ @Override
+ public QueryPlan implement(Implementor implementor) {
+ assert getConvention() == getInput().getConvention();
+
+ QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+
+ TableRef tableRef = implementor.getTableRef();
+ PhoenixStatement stmt = plan.getContext().getStatement();
+ StatementContext context;
+ try {
+ context = new StatementContext(stmt, FromCompiler.getResolver(tableRef), new Scan(), new SequenceManager(stmt));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ GroupBy groupBy = super.getGroupBy(implementor);
+ super.serializeAggregators(implementor, context, groupBy.isEmpty());
+
+ QueryPlan aggPlan = new ClientAggregatePlan(context, plan.getStatement(), tableRef, RowProjector.EMPTY_PROJECTOR, null, null, OrderBy.EMPTY_ORDER_BY, groupBy, null, plan);
+
+ return PhoenixAggregate.wrapWithProject(implementor, aggPlan, groupBy.getKeyExpressions(), Arrays.asList(context.getAggregationManager().getAggregators().getFunctions()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixClientJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixClientJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixClientJoin.java
new file mode 100644
index 0000000..49d7687
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixClientJoin.java
@@ -0,0 +1,55 @@
+package org.apache.phoenix.calcite;
+
+import java.util.Set;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Util;
+import org.apache.phoenix.compile.QueryPlan;
+
+import com.google.common.collect.ImmutableSet;
+
+public class PhoenixClientJoin extends PhoenixJoin {
+
+ public PhoenixClientJoin(RelOptCluster cluster, RelTraitSet traits,
+ RelNode left, RelNode right, RexNode condition,
+ JoinRelType joinType, Set<String> variablesStopped) {
+ super(cluster, traits, left, right, condition, joinType,
+ variablesStopped);
+ }
+
+ @Override
+ public PhoenixClientJoin copy(RelTraitSet traits, RexNode condition, RelNode left,
+ RelNode right, JoinRelType joinRelType, boolean semiJoinDone) {
+ return new PhoenixClientJoin(getCluster(), traits, left, right, condition, joinRelType, ImmutableSet.<String>of());
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ double rowCount = RelMetadataQuery.getRowCount(this);
+
+ for (RelNode input : getInputs()) {
+ double inputRowCount = input.getRows();
+ if (Double.isInfinite(inputRowCount)) {
+ rowCount = inputRowCount;
+ } else {
+ rowCount += Util.nLogN(inputRowCount);
+ }
+ }
+ RelOptCost cost = planner.getCostFactory().makeCost(rowCount, 0, 0);
+
+ return cost.multiplyBy(PHOENIX_FACTOR);
+ }
+
+ @Override
+ public QueryPlan implement(Implementor implementor) {
+ throw new UnsupportedOperationException();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixClientProject.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixClientProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixClientProject.java
new file mode 100644
index 0000000..cbd4441
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixClientProject.java
@@ -0,0 +1,45 @@
+package org.apache.phoenix.calcite;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.execute.TupleProjectionPlan;
+import org.apache.phoenix.execute.TupleProjector;
+
+public class PhoenixClientProject extends PhoenixProject {
+
+ public PhoenixClientProject(RelOptCluster cluster, RelTraitSet traits,
+ RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
+ super(cluster, traits, input, projects, rowType);
+ }
+
+ @Override
+ public PhoenixClientProject copy(RelTraitSet traits, RelNode input,
+ List<RexNode> projects, RelDataType rowType) {
+ return new PhoenixClientProject(getCluster(), traits, input, projects, rowType);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return super.computeSelfCost(planner)
+ .multiplyBy(PHOENIX_FACTOR);
+ }
+
+ @Override
+ public QueryPlan implement(Implementor implementor) {
+ assert getConvention() == getInput().getConvention();
+
+ QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+ TupleProjector tupleProjector = project(implementor);
+
+ return new TupleProjectionPlan(plan, tupleProjector, null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixClientSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixClientSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixClientSort.java
new file mode 100644
index 0000000..6bb67fb
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixClientSort.java
@@ -0,0 +1,65 @@
+package org.apache.phoenix.calcite;
+
+import java.sql.SQLException;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.FromCompiler;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.SequenceManager;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.ClientScanPlan;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.schema.TableRef;
+
+public class PhoenixClientSort extends PhoenixSort {
+
+ public PhoenixClientSort(RelOptCluster cluster, RelTraitSet traits,
+ RelNode child, RelCollation collation, RexNode offset, RexNode fetch) {
+ super(cluster, traits, child, collation, offset, fetch);
+ }
+
+ @Override
+ public PhoenixClientSort copy(RelTraitSet traitSet, RelNode newInput,
+ RelCollation newCollation, RexNode offset, RexNode fetch) {
+ return new PhoenixClientSort(getCluster(), traitSet, newInput, newCollation, offset, fetch);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return super.computeSelfCost(planner)
+ .multiplyBy(PHOENIX_FACTOR);
+ }
+
+ @Override
+ public QueryPlan implement(Implementor implementor) {
+ assert getConvention() == getInput().getConvention();
+ if (this.offset != null)
+ throw new UnsupportedOperationException();
+
+ QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+
+ TableRef tableRef = implementor.getTableRef();
+ PhoenixStatement stmt = plan.getContext().getStatement();
+ StatementContext context;
+ try {
+ context = new StatementContext(stmt, FromCompiler.getResolver(tableRef), new Scan(), new SequenceManager(stmt));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+
+ OrderBy orderBy = super.getOrderBy(implementor);
+ Integer limit = super.getLimit(implementor);
+
+ return new ClientScanPlan(context, plan.getStatement(), tableRef, RowProjector.EMPTY_PROJECTOR, limit, null, orderBy, plan);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixCompactClientSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixCompactClientSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixCompactClientSort.java
new file mode 100644
index 0000000..7037598
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixCompactClientSort.java
@@ -0,0 +1,66 @@
+package org.apache.phoenix.calcite;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.parse.SelectStatement;
+
+public class PhoenixCompactClientSort extends PhoenixSort {
+
+ public PhoenixCompactClientSort(RelOptCluster cluster, RelTraitSet traits,
+ RelNode child, RelCollation collation, RexNode offset, RexNode fetch) {
+ super(cluster, traits, child, collation, offset, fetch);
+ }
+
+ @Override
+ public PhoenixCompactClientSort copy(RelTraitSet traitSet, RelNode newInput,
+ RelCollation newCollation, RexNode offset, RexNode fetch) {
+ return new PhoenixCompactClientSort(getCluster(), traitSet, newInput, newCollation, offset, fetch);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return super.computeSelfCost(planner)
+ .multiplyBy(CLIENT_MERGE_FACTOR)
+ .multiplyBy(PHOENIX_FACTOR);
+ }
+
+ @Override
+ public QueryPlan implement(Implementor implementor) {
+ assert getConvention() == getInput().getConvention();
+ if (this.offset != null)
+ throw new UnsupportedOperationException();
+
+ QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+ assert (plan instanceof AggregatePlan || plan instanceof HashJoinPlan)
+ && plan.getLimit() == null;
+
+ AggregatePlan basePlan;
+ if (plan instanceof AggregatePlan) {
+ basePlan = (AggregatePlan) plan;
+ } else {
+ QueryPlan delegate = ((HashJoinPlan) plan).getDelegate();
+ assert delegate instanceof AggregatePlan;
+ basePlan = (AggregatePlan) delegate;
+ }
+
+ OrderBy orderBy = super.getOrderBy(implementor);
+ Integer limit = super.getLimit(implementor);
+ QueryPlan newPlan = AggregatePlan.create((AggregatePlan) basePlan, orderBy, limit);
+
+ if (plan instanceof HashJoinPlan) {
+ HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
+ newPlan = HashJoinPlan.create((SelectStatement) (plan.getStatement()), newPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
+ }
+ return newPlan;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilter.java
index 0a49477..f569605 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixFilter.java
@@ -38,9 +38,4 @@ public class PhoenixFilter extends Filter implements PhoenixRel {
return new ClientScanPlan(plan.getContext(), plan.getStatement(), plan.getTableRef(),
plan.getProjector(), null, expr, OrderBy.EMPTY_ORDER_BY, plan);
}
-
- @Override
- public PlanType getPlanType() {
- return PlanType.CLIENT_SERVER;
- }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java
index 0b85217..d466635 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java
@@ -1,136 +1,26 @@
package org.apache.phoenix.calcite;
-import java.sql.SQLException;
-import java.util.Iterator;
-import java.util.List;
import java.util.Set;
import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.util.Util;
-import org.apache.phoenix.compile.JoinCompiler;
-import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.execute.HashJoinPlan;
-import org.apache.phoenix.expression.Expression;
-import org.apache.phoenix.expression.LiteralExpression;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.parse.JoinTableNode.JoinType;
-import org.apache.phoenix.parse.SelectStatement;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.TableRef;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
/**
* Implementation of {@link org.apache.calcite.rel.core.Join}
* relational expression in Phoenix.
*/
-public class PhoenixJoin extends Join implements PhoenixRel {
+abstract public class PhoenixJoin extends Join implements PhoenixRel {
public PhoenixJoin(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition, JoinRelType joinType, Set<String> variablesStopped) {
super( cluster, traits, left, right, condition, joinType, variablesStopped);
assert getConvention() == PhoenixRel.CONVENTION;
}
-
- @Override
- public PhoenixJoin copy(RelTraitSet traits, RexNode condition, RelNode left, RelNode right, JoinRelType joinRelType, boolean semiJoinDone) {
- return new PhoenixJoin(getCluster(), traits, left, right, condition, joinRelType, ImmutableSet.<String>of());
- }
-
- @Override
- public RelOptCost computeSelfCost(RelOptPlanner planner) {
- double rowCount = RelMetadataQuery.getRowCount(this);
-
- for (RelNode input : getInputs()) {
- double inputRowCount = input.getRows();
- if (Double.isInfinite(inputRowCount)) {
- rowCount = inputRowCount;
- } else if (input == getLeft() && isHashJoinDoable()) {
- rowCount += inputRowCount;
- } else {
- rowCount += Util.nLogN(inputRowCount);
- }
- }
- RelOptCost cost = planner.getCostFactory().makeCost(rowCount, 0, 0);
-
- return cost.multiplyBy(PHOENIX_FACTOR);
- }
-
- @Override
- public QueryPlan implement(Implementor implementor) {
- assert getLeft().getConvention() == PhoenixRel.CONVENTION;
- assert getRight().getConvention() == PhoenixRel.CONVENTION;
- PhoenixRel left = (PhoenixRel) getLeft();
- PhoenixRel right = (PhoenixRel) getRight();
- if (!isHashJoinDoable())
- throw new UnsupportedOperationException();
-
- JoinInfo joinInfo = JoinInfo.of(left, right, getCondition());
- List<Expression> leftExprs = Lists.<Expression> newArrayList();
- List<Expression> rightExprs = Lists.<Expression> newArrayList();
- implementor.pushContext(new ImplementorContext(implementor.getCurrentContext().isRetainPKColumns(), true));
- QueryPlan leftPlan = implementor.visitInput(0, left);
- PTable leftTable = implementor.getTableRef().getTable();
- for (Iterator<Integer> iter = joinInfo.leftKeys.iterator(); iter.hasNext();) {
- Integer index = iter.next();
- leftExprs.add(implementor.newColumnExpression(index));
- }
- if (leftExprs.isEmpty()) {
- leftExprs.add(LiteralExpression.newConstant(0));
- }
- implementor.popContext();
- implementor.pushContext(new ImplementorContext(false, true));
- QueryPlan rightPlan = implementor.visitInput(1, right);
- PTable rightTable = implementor.getTableRef().getTable();
- for (Iterator<Integer> iter = joinInfo.rightKeys.iterator(); iter.hasNext();) {
- Integer index = iter.next();
- rightExprs.add(implementor.newColumnExpression(index));
- }
- if (rightExprs.isEmpty()) {
- rightExprs.add(LiteralExpression.newConstant(0));
- }
- implementor.popContext();
-
- JoinType type = convertJoinType(getJoinType());
- PTable joinedTable;
- try {
- joinedTable = JoinCompiler.joinProjectedTables(leftTable, rightTable, type);
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- implementor.setTableRef(new TableRef(joinedTable));
- RexNode postFilter = joinInfo.getRemaining(getCluster().getRexBuilder());
- Expression postFilterExpr = postFilter.isAlwaysTrue() ? null : CalciteUtils.toExpression(postFilter, implementor);
- @SuppressWarnings("unchecked")
- HashJoinInfo hashJoinInfo = new HashJoinInfo(
- joinedTable, new ImmutableBytesPtr[] {new ImmutableBytesPtr()},
- (List<Expression>[]) (new List[] {leftExprs}),
- new JoinType[] {type}, new boolean[] {true},
- new PTable[] {rightTable},
- new int[] {leftTable.getColumns().size() - leftTable.getPKColumns().size()},
- postFilterExpr, null);
-
- return HashJoinPlan.create(SelectStatement.SELECT_STAR, leftPlan, hashJoinInfo, new HashJoinPlan.HashSubPlan[] {new HashJoinPlan.HashSubPlan(0, rightPlan, rightExprs, false, null, null)});
- }
-
- private boolean isHashJoinDoable() {
- // TODO check memory limit
- RelNode rel = CalciteUtils.getBestRel(getLeft());
- return rel instanceof PhoenixRel
- && ((PhoenixRel) rel).getPlanType() == PlanType.SERVER_ONLY_FLAT
- && getJoinType() != JoinRelType.RIGHT;
- }
- private JoinType convertJoinType(JoinRelType type) {
+ protected static JoinType convertJoinType(JoinRelType type) {
JoinType ret = null;
switch (type) {
case INNER:
@@ -150,9 +40,4 @@ public class PhoenixJoin extends Join implements PhoenixRel {
return ret;
}
-
- @Override
- public PlanType getPlanType() {
- return isHashJoinDoable() ? PlanType.SERVER_ONLY_COMPLEX : PlanType.CLIENT_SERVER;
- }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
index 53793d7..12ffcfb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
@@ -3,16 +3,11 @@ package org.apache.phoenix.calcite;
import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptCost;
-import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
-import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.execute.ScanPlan;
-import org.apache.phoenix.execute.TupleProjectionPlan;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.schema.PTable;
@@ -24,32 +19,13 @@ import com.google.common.collect.Lists;
* Implementation of {@link org.apache.calcite.rel.core.Project}
* relational expression in Phoenix.
*/
-public class PhoenixProject extends Project implements PhoenixRel {
+abstract public class PhoenixProject extends Project implements PhoenixRel {
public PhoenixProject(RelOptCluster cluster, RelTraitSet traits, RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
super(cluster, traits, input, projects, rowType);
assert getConvention() == PhoenixRel.CONVENTION;
}
-
- @Override
- public PhoenixProject copy(RelTraitSet traits, RelNode input, List<RexNode> projects, RelDataType rowType) {
- return new PhoenixProject(getCluster(), traits, input, projects, rowType);
- }
-
- public RelOptCost computeSelfCost(RelOptPlanner planner) {
- RelOptCost cost = super.computeSelfCost(planner);
- if (getPlanType() != PlanType.CLIENT_SERVER) {
- cost = cost.multiplyBy(SERVER_FACTOR);
- }
- return cost.multiplyBy(PHOENIX_FACTOR);
- }
-
- @Override
- public QueryPlan implement(Implementor implementor) {
- assert getConvention() == getInput().getConvention();
- implementor.pushContext(new ImplementorContext(implementor.getCurrentContext().isRetainPKColumns(), false));
- QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
- implementor.popContext();
-
+
+ protected TupleProjector project(Implementor implementor) {
List<Expression> exprs = Lists.newArrayList();
for (RexNode project : getProjects()) {
exprs.add(CalciteUtils.toExpression(project, implementor));
@@ -57,20 +33,7 @@ public class PhoenixProject extends Project implements PhoenixRel {
TupleProjector tupleProjector = implementor.project(exprs);
PTable projectedTable = implementor.createProjectedTable();
implementor.setTableRef(new TableRef(projectedTable));
-
- boolean isScan = plan instanceof ScanPlan;
- if (getPlanType() == PlanType.CLIENT_SERVER
- || TupleProjector.hasProjector(plan.getContext().getScan(), isScan))
- return new TupleProjectionPlan(plan, tupleProjector, null);
-
- TupleProjector.serializeProjectorIntoScan(plan.getContext().getScan(), tupleProjector, isScan);
- return plan;
- }
- @Override
- public PlanType getPlanType() {
- RelNode rel = CalciteUtils.getBestRel(getInput());
- return rel instanceof PhoenixRel && !(rel instanceof PhoenixProject) ?
- ((PhoenixRel) rel).getPlanType() : PlanType.CLIENT_SERVER;
+ return tupleProjector;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java
index f5943da..42108ee 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java
@@ -35,14 +35,6 @@ public interface PhoenixRel extends RelNode {
* Server is cheaper.
*/
double SERVER_FACTOR = 0.2;
-
- enum PlanType {
- SERVER_ONLY_FLAT,
- SERVER_ONLY_COMPLEX,
- CLIENT_SERVER,
- }
-
- PlanType getPlanType();
QueryPlan implement(Implementor implementor);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java
index 843e6de..9130f77 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRules.java
@@ -2,16 +2,14 @@ package org.apache.phoenix.calcite;
import org.apache.calcite.adapter.enumerable.EnumerableConvention;
import org.apache.calcite.plan.*;
-import org.apache.calcite.rel.InvalidRelException;
-import org.apache.calcite.rel.RelCollationImpl;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.Sort;
-import org.apache.calcite.rel.core.Union;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalUnion;
import org.apache.calcite.util.trace.CalciteTrace;
import java.util.logging.Logger;
@@ -58,16 +56,16 @@ public class PhoenixRules {
public static final PhoenixSortRule INSTANCE = new PhoenixSortRule();
private PhoenixSortRule() {
- super(Sort.class, Convention.NONE, PhoenixRel.CONVENTION,
+ super(LogicalSort.class, Convention.NONE, PhoenixRel.CONVENTION,
"PhoenixSortRule");
}
public RelNode convert(RelNode rel) {
- final Sort sort = (Sort) rel;
+ final LogicalSort sort = (LogicalSort) rel;
final RelTraitSet traitSet =
sort.getTraitSet().replace(out)
.replace(sort.getCollation());
- return new PhoenixSort(rel.getCluster(), traitSet,
+ return new PhoenixClientSort(rel.getCluster(), traitSet,
convert(sort.getInput(), sort.getInput().getTraitSet().replace(out)),
sort.getCollation(), sort.offset, sort.fetch);
}
@@ -111,7 +109,7 @@ public class PhoenixRules {
public RelNode convert(RelNode rel) {
final LogicalProject project = (LogicalProject) rel;
final RelTraitSet traitSet = project.getTraitSet().replace(out);
- return new PhoenixProject(project.getCluster(), traitSet,
+ return new PhoenixClientProject(project.getCluster(), traitSet,
convert(project.getInput(), project.getInput().getTraitSet().replace(out)), project.getProjects(),
project.getRowType());
}
@@ -133,8 +131,7 @@ public class PhoenixRules {
final LogicalAggregate agg = (LogicalAggregate) rel;
final RelTraitSet traitSet =
agg.getTraitSet().replace(out);
- try {
- return new PhoenixAggregate(
+ return new PhoenixClientAggregate(
rel.getCluster(),
traitSet,
convert(agg.getInput(), agg.getInput().getTraitSet().replace(out)),
@@ -142,10 +139,6 @@ public class PhoenixRules {
agg.getGroupSet(),
agg.getGroupSets(),
agg.getAggCallList());
- } catch (InvalidRelException e) {
- LOGGER.warning(e.toString());
- return null;
- }
}
}
@@ -157,12 +150,12 @@ public class PhoenixRules {
public static final PhoenixUnionRule INSTANCE = new PhoenixUnionRule();
private PhoenixUnionRule() {
- super(Union.class, Convention.NONE, PhoenixRel.CONVENTION,
+ super(LogicalUnion.class, Convention.NONE, PhoenixRel.CONVENTION,
"PhoenixUnionRule");
}
public RelNode convert(RelNode rel) {
- final Union union = (Union) rel;
+ final LogicalUnion union = (LogicalUnion) rel;
final RelTraitSet traitSet = union.getTraitSet().replace(out);
return new PhoenixUnion(rel.getCluster(), traitSet, convertList(union.getInputs(), out),
union.all);
@@ -177,15 +170,15 @@ public class PhoenixRules {
public static final PhoenixJoinRule INSTANCE = new PhoenixJoinRule();
private PhoenixJoinRule() {
- super(Join.class, Convention.NONE, PhoenixRel.CONVENTION,
+ super(LogicalJoin.class, Convention.NONE, PhoenixRel.CONVENTION,
"PhoenixJoinRule");
}
public RelNode convert(RelNode rel) {
- final Join join = (Join) rel;
+ final LogicalJoin join = (LogicalJoin) rel;
final RelTraitSet traitSet =
join.getTraitSet().replace(out);
- return new PhoenixJoin(rel.getCluster(), traitSet,
+ return new PhoenixClientJoin(rel.getCluster(), traitSet,
convert(join.getLeft(), join.getLeft().getTraitSet().replace(out)),
convert(join.getRight(), join.getRight().getTraitSet().replace(out)),
join.getCondition(),
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerAggregate.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerAggregate.java
new file mode 100644
index 0000000..a535a35
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerAggregate.java
@@ -0,0 +1,73 @@
+package org.apache.phoenix.calcite;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.parse.SelectStatement;
+
+public class PhoenixServerAggregate extends PhoenixAggregate {
+
+ public PhoenixServerAggregate(RelOptCluster cluster, RelTraitSet traits,
+ RelNode child, boolean indicator, ImmutableBitSet groupSet,
+ List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+ super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls);
+ }
+
+ @Override
+ public PhoenixServerAggregate copy(RelTraitSet traits, RelNode input, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggregateCalls) {
+ return new PhoenixServerAggregate(getCluster(), traits, input, indicator, groupSet, groupSets, aggregateCalls);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return super.computeSelfCost(planner)
+ .multiplyBy(SERVER_FACTOR)
+ .multiplyBy(PHOENIX_FACTOR);
+ }
+
+ @Override
+ public QueryPlan implement(Implementor implementor) {
+ assert getConvention() == getInput().getConvention();
+
+ QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+ assert (plan instanceof ScanPlan || plan instanceof HashJoinPlan)
+ && plan.getLimit() == null;
+
+ ScanPlan basePlan;
+ if (plan instanceof ScanPlan) {
+ basePlan = (ScanPlan) plan;
+ } else {
+ QueryPlan delegate = ((HashJoinPlan) plan).getDelegate();
+ assert delegate instanceof ScanPlan;
+ basePlan = (ScanPlan) delegate;
+ }
+
+ StatementContext context = basePlan.getContext();
+ GroupBy groupBy = super.getGroupBy(implementor);
+ super.serializeAggregators(implementor, context, groupBy.isEmpty());
+
+ QueryPlan aggPlan = new AggregatePlan(context, basePlan.getStatement(), basePlan.getTableRef(), RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null, groupBy, null);
+ if (plan instanceof HashJoinPlan) {
+ HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
+ aggPlan = HashJoinPlan.create((SelectStatement) (plan.getStatement()), aggPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
+ }
+
+ return PhoenixAggregate.wrapWithProject(implementor, aggPlan, groupBy.getKeyExpressions(), Arrays.asList(context.getAggregationManager().getAggregators().getFunctions()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerJoin.java
new file mode 100644
index 0000000..4e11294
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerJoin.java
@@ -0,0 +1,124 @@
+package org.apache.phoenix.calcite;
+
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Util;
+import org.apache.phoenix.compile.JoinCompiler;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.parse.JoinTableNode.JoinType;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableRef;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
+public class PhoenixServerJoin extends PhoenixJoin {
+
+ public PhoenixServerJoin(RelOptCluster cluster, RelTraitSet traits,
+ RelNode left, RelNode right, RexNode condition,
+ JoinRelType joinType, Set<String> variablesStopped) {
+ super(cluster, traits, left, right, condition, joinType,
+ variablesStopped);
+ }
+
+ @Override
+ public PhoenixServerJoin copy(RelTraitSet traits, RexNode condition, RelNode left,
+ RelNode right, JoinRelType joinRelType, boolean semiJoinDone) {
+ return new PhoenixServerJoin(getCluster(), traits, left, right, condition, joinRelType, ImmutableSet.<String>of());
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ //TODO return infinite cost if RHS size exceeds memory limit.
+
+ double rowCount = RelMetadataQuery.getRowCount(this);
+
+ for (RelNode input : getInputs()) {
+ double inputRowCount = input.getRows();
+ if (Double.isInfinite(inputRowCount)) {
+ rowCount = inputRowCount;
+ } else if (input == getLeft()) {
+ rowCount += inputRowCount;
+ } else {
+ rowCount += Util.nLogN(inputRowCount);
+ }
+ }
+ RelOptCost cost = planner.getCostFactory().makeCost(rowCount, 0, 0);
+
+ return cost.multiplyBy(PHOENIX_FACTOR);
+ }
+
+ @Override
+ public QueryPlan implement(Implementor implementor) {
+ assert getLeft().getConvention() == PhoenixRel.CONVENTION;
+ assert getRight().getConvention() == PhoenixRel.CONVENTION;
+ PhoenixRel left = (PhoenixRel) getLeft();
+ PhoenixRel right = (PhoenixRel) getRight();
+
+ JoinInfo joinInfo = JoinInfo.of(left, right, getCondition());
+ List<Expression> leftExprs = Lists.<Expression> newArrayList();
+ List<Expression> rightExprs = Lists.<Expression> newArrayList();
+ implementor.pushContext(new ImplementorContext(implementor.getCurrentContext().isRetainPKColumns(), true));
+ QueryPlan leftPlan = implementor.visitInput(0, left);
+ PTable leftTable = implementor.getTableRef().getTable();
+ for (Iterator<Integer> iter = joinInfo.leftKeys.iterator(); iter.hasNext();) {
+ Integer index = iter.next();
+ leftExprs.add(implementor.newColumnExpression(index));
+ }
+ if (leftExprs.isEmpty()) {
+ leftExprs.add(LiteralExpression.newConstant(0));
+ }
+ implementor.popContext();
+ implementor.pushContext(new ImplementorContext(false, true));
+ QueryPlan rightPlan = implementor.visitInput(1, right);
+ PTable rightTable = implementor.getTableRef().getTable();
+ for (Iterator<Integer> iter = joinInfo.rightKeys.iterator(); iter.hasNext();) {
+ Integer index = iter.next();
+ rightExprs.add(implementor.newColumnExpression(index));
+ }
+ if (rightExprs.isEmpty()) {
+ rightExprs.add(LiteralExpression.newConstant(0));
+ }
+ implementor.popContext();
+
+ JoinType type = convertJoinType(getJoinType());
+ PTable joinedTable;
+ try {
+ joinedTable = JoinCompiler.joinProjectedTables(leftTable, rightTable, type);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ implementor.setTableRef(new TableRef(joinedTable));
+ RexNode postFilter = joinInfo.getRemaining(getCluster().getRexBuilder());
+ Expression postFilterExpr = postFilter.isAlwaysTrue() ? null : CalciteUtils.toExpression(postFilter, implementor);
+ @SuppressWarnings("unchecked")
+ HashJoinInfo hashJoinInfo = new HashJoinInfo(
+ joinedTable, new ImmutableBytesPtr[] {new ImmutableBytesPtr()},
+ (List<Expression>[]) (new List[] {leftExprs}),
+ new JoinType[] {type}, new boolean[] {true},
+ new PTable[] {rightTable},
+ new int[] {leftTable.getColumns().size() - leftTable.getPKColumns().size()},
+ postFilterExpr, null);
+
+ return HashJoinPlan.create(SelectStatement.SELECT_STAR, leftPlan, hashJoinInfo, new HashJoinPlan.HashSubPlan[] {new HashJoinPlan.HashSubPlan(0, rightPlan, rightExprs, false, null, null)});
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerProject.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerProject.java
new file mode 100644
index 0000000..218f783
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerProject.java
@@ -0,0 +1,51 @@
+package org.apache.phoenix.calcite;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.execute.TupleProjector;
+
+public class PhoenixServerProject extends PhoenixProject {
+
+ public PhoenixServerProject(RelOptCluster cluster, RelTraitSet traits,
+ RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
+ super(cluster, traits, input, projects, rowType);
+ }
+
+ @Override
+ public PhoenixServerProject copy(RelTraitSet traits, RelNode input,
+ List<RexNode> projects, RelDataType rowType) {
+ return new PhoenixServerProject(getCluster(), traits, input, projects, rowType);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return super.computeSelfCost(planner)
+ .multiplyBy(SERVER_FACTOR)
+ .multiplyBy(PHOENIX_FACTOR);
+ }
+
+ @Override
+ public QueryPlan implement(Implementor implementor) {
+ assert getConvention() == getInput().getConvention();
+
+ implementor.pushContext(new ImplementorContext(implementor.getCurrentContext().isRetainPKColumns(), false));
+ QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+ implementor.popContext();
+ assert (plan instanceof ScanPlan || plan instanceof HashJoinPlan)
+ && !TupleProjector.hasProjector(plan.getContext().getScan(), plan instanceof ScanPlan);
+
+ TupleProjector tupleProjector = super.project(implementor);
+ TupleProjector.serializeProjectorIntoScan(plan.getContext().getScan(), tupleProjector, plan instanceof ScanPlan);
+ return plan;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8cde1c54/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerSort.java
new file mode 100644
index 0000000..1538309
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerSort.java
@@ -0,0 +1,73 @@
+package org.apache.phoenix.calcite;
+
+import java.sql.SQLException;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.parse.SelectStatement;
+
+public class PhoenixServerSort extends PhoenixSort {
+
+ public PhoenixServerSort(RelOptCluster cluster, RelTraitSet traits,
+ RelNode child, RelCollation collation, RexNode offset, RexNode fetch) {
+ super(cluster, traits, child, collation, offset, fetch);
+ }
+
+ @Override
+ public PhoenixServerSort copy(RelTraitSet traitSet, RelNode newInput,
+ RelCollation newCollation, RexNode offset, RexNode fetch) {
+ return new PhoenixServerSort(getCluster(), traitSet, newInput, newCollation, offset, fetch);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return super.computeSelfCost(planner)
+ .multiplyBy(SERVER_FACTOR)
+ .multiplyBy(PHOENIX_FACTOR);
+ }
+
+ @Override
+ public QueryPlan implement(Implementor implementor) {
+ assert getConvention() == getInput().getConvention();
+ if (this.offset != null)
+ throw new UnsupportedOperationException();
+
+ QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+ assert (plan instanceof ScanPlan || plan instanceof HashJoinPlan)
+ && plan.getLimit() == null;
+
+ ScanPlan basePlan;
+ if (plan instanceof ScanPlan) {
+ basePlan = (ScanPlan) plan;
+ } else {
+ QueryPlan delegate = ((HashJoinPlan) plan).getDelegate();
+ assert delegate instanceof ScanPlan;
+ basePlan = (ScanPlan) delegate;
+ }
+
+ OrderBy orderBy = super.getOrderBy(implementor);
+ Integer limit = super.getLimit(implementor);
+ QueryPlan newPlan;
+ try {
+ newPlan = ScanPlan.create((ScanPlan) basePlan, orderBy, limit);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (plan instanceof HashJoinPlan) {
+ HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
+ newPlan = HashJoinPlan.create((SelectStatement) (plan.getStatement()), newPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
+ }
+ return newPlan;
+ }
+
+}