You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/04/15 01:23:24 UTC
[2/3] phoenix git commit: Add PhoenixClientJoinRule;
File move and rename.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixValues.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixValues.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixValues.java
deleted file mode 100644
index 6e01abb..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixValues.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package org.apache.phoenix.calcite;
-
-import java.util.List;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Values;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.phoenix.compile.QueryPlan;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- * Implementation of {@link org.apache.calcite.rel.core.Values}
- * relational expression in Phoenix.
- */
-public class PhoenixValues extends Values implements PhoenixRel {
- public PhoenixValues(RelOptCluster cluster, RelDataType rowType, ImmutableList<ImmutableList<RexLiteral>> tuples, RelTraitSet traits) {
- super(cluster, rowType, tuples, traits);
- assert getConvention() == PhoenixRel.CONVENTION;
- }
-
- @Override
- public PhoenixValues copy(RelTraitSet traitSet, List<RelNode> inputs) {
- assert traitSet.containsIfApplicable(Convention.NONE);
- assert inputs.isEmpty();
- return new PhoenixValues(getCluster(), rowType, tuples, traitSet);
- }
-
- @Override
- public QueryPlan implement(Implementor implementor) {
- throw new UnsupportedOperationException();
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
new file mode 100644
index 0000000..adc9b63
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
@@ -0,0 +1,119 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.phoenix.calcite.CalciteUtils;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.execute.TupleProjectionPlan;
+import org.apache.phoenix.execute.TupleProjector;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.RowKeyColumnExpression;
+import org.apache.phoenix.expression.aggregator.ClientAggregators;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.expression.function.AggregateFunction;
+import org.apache.phoenix.expression.function.SingleAggregateFunction;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.RowKeyValueAccessor;
+import org.apache.phoenix.schema.TableRef;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Implementation of {@link org.apache.calcite.rel.core.Aggregate}
+ * relational expression in Phoenix.
+ */
+abstract public class PhoenixAbstractAggregate extends Aggregate implements PhoenixRel {
+
+ protected PhoenixAbstractAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode child, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+ super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls);
+ assert getConvention() == PhoenixRel.CONVENTION;
+
+ for (AggregateCall aggCall : aggCalls) {
+ if (aggCall.isDistinct()) {
+ throw new UnsupportedOperationException( "distinct aggregation not supported");
+ }
+ }
+ switch (getGroupType()) {
+ case SIMPLE:
+ break;
+ default:
+ throw new UnsupportedOperationException("unsupported group type: " + getGroupType());
+ }
+ }
+
+ protected GroupBy getGroupBy(Implementor implementor) {
+ if (groupSets.size() > 1) {
+ throw new UnsupportedOperationException();
+ }
+
+ List<Integer> ordinals = groupSet.asList();
+ // TODO check order-preserving
+ String groupExprAttribName = BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS;
+ // TODO sort group by keys. not sure if there is a way to avoid this sorting,
+ // otherwise we would have add an extra projection.
+ // TODO convert key types. can be avoided?
+ List<Expression> keyExprs = Lists.newArrayListWithExpectedSize(ordinals.size());
+ for (int i = 0; i < ordinals.size(); i++) {
+ Expression expr = implementor.newColumnExpression(ordinals.get(i));
+ keyExprs.add(expr);
+ }
+
+ return new GroupBy.GroupByBuilder().setScanAttribName(groupExprAttribName).setExpressions(keyExprs).setKeyExpressions(keyExprs).build();
+ }
+
+ protected void serializeAggregators(Implementor implementor, StatementContext context, boolean isEmptyGroupBy) {
+ // TODO sort aggFuncs. same problem with group by key sorting.
+ List<SingleAggregateFunction> aggFuncs = Lists.newArrayList();
+ for (AggregateCall call : aggCalls) {
+ AggregateFunction aggFunc = CalciteUtils.toAggregateFunction(call.getAggregation(), call.getArgList(), implementor);
+ if (!(aggFunc instanceof SingleAggregateFunction)) {
+ throw new UnsupportedOperationException();
+ }
+ aggFuncs.add((SingleAggregateFunction) aggFunc);
+ }
+ int minNullableIndex = getMinNullableIndex(aggFuncs, isEmptyGroupBy);
+ context.getScan().setAttribute(BaseScannerRegionObserver.AGGREGATORS, ServerAggregators.serialize(aggFuncs, minNullableIndex));
+ ClientAggregators clientAggregators = new ClientAggregators(aggFuncs, minNullableIndex);
+ context.getAggregationManager().setAggregators(clientAggregators);
+ }
+
+ protected static QueryPlan wrapWithProject(Implementor implementor, QueryPlan plan, List<Expression> keyExpressions, List<SingleAggregateFunction> aggFuncs) {
+ List<Expression> exprs = Lists.newArrayList();
+ for (int i = 0; i < keyExpressions.size(); i++) {
+ Expression keyExpr = keyExpressions.get(i);
+ RowKeyValueAccessor accessor = new RowKeyValueAccessor(keyExpressions, i);
+ Expression expr = new RowKeyColumnExpression(keyExpr, accessor, keyExpr.getDataType());
+ exprs.add(expr);
+ }
+ for (SingleAggregateFunction aggFunc : aggFuncs) {
+ exprs.add(aggFunc);
+ }
+
+ TupleProjector tupleProjector = implementor.project(exprs);
+ PTable projectedTable = implementor.createProjectedTable();
+ implementor.setTableRef(new TableRef(projectedTable));
+ return new TupleProjectionPlan(plan, tupleProjector, null);
+ }
+
+ private static int getMinNullableIndex(List<SingleAggregateFunction> aggFuncs, boolean isUngroupedAggregation) {
+ int minNullableIndex = aggFuncs.size();
+ for (int i = 0; i < aggFuncs.size(); i++) {
+ SingleAggregateFunction aggFunc = aggFuncs.get(i);
+ if (isUngroupedAggregation ? aggFunc.getAggregator().isNullable() : aggFunc.getAggregatorExpression().isNullable()) {
+ minNullableIndex = i;
+ break;
+ }
+ }
+ return minNullableIndex;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java
new file mode 100644
index 0000000..86ad41f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java
@@ -0,0 +1,43 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.util.Set;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.phoenix.parse.JoinTableNode.JoinType;
+
+/**
+ * Implementation of {@link org.apache.calcite.rel.core.Join}
+ * relational expression in Phoenix.
+ */
+abstract public class PhoenixAbstractJoin extends Join implements PhoenixRel {
+ public PhoenixAbstractJoin(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition, JoinRelType joinType, Set<String> variablesStopped) {
+ super( cluster, traits, left, right, condition, joinType, variablesStopped);
+ assert getConvention() == PhoenixRel.CONVENTION;
+ }
+
+ protected static JoinType convertJoinType(JoinRelType type) {
+ JoinType ret = null;
+ switch (type) {
+ case INNER:
+ ret = JoinType.Inner;
+ break;
+ case LEFT:
+ ret = JoinType.Left;
+ break;
+ case RIGHT:
+ ret = JoinType.Right;
+ break;
+ case FULL:
+ ret = JoinType.Full;
+ break;
+ default:
+ }
+
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java
new file mode 100644
index 0000000..2c77e9f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java
@@ -0,0 +1,40 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.phoenix.calcite.CalciteUtils;
+import org.apache.phoenix.execute.TupleProjector;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableRef;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Implementation of {@link org.apache.calcite.rel.core.Project}
+ * relational expression in Phoenix.
+ */
+abstract public class PhoenixAbstractProject extends Project implements PhoenixRel {
+ public PhoenixAbstractProject(RelOptCluster cluster, RelTraitSet traits, RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
+ super(cluster, traits, input, projects, rowType);
+ assert getConvention() == PhoenixRel.CONVENTION;
+ }
+
+ protected TupleProjector project(Implementor implementor) {
+ List<Expression> exprs = Lists.newArrayList();
+ for (RexNode project : getProjects()) {
+ exprs.add(CalciteUtils.toExpression(project, implementor));
+ }
+ TupleProjector tupleProjector = implementor.project(exprs);
+ PTable projectedTable = implementor.createProjectedTable();
+ implementor.setTableRef(new TableRef(projectedTable));
+
+ return tupleProjector;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
new file mode 100644
index 0000000..708b5ae
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
@@ -0,0 +1,68 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rex.RexNode;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.calcite.CalciteUtils;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.TupleProjector;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.schema.SortOrder;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Implementation of {@link org.apache.calcite.rel.core.Sort}
+ * relational expression in Phoenix.
+ *
+ * <p>Like {@code Sort}, it also supports LIMIT and OFFSET.
+ */
+abstract public class PhoenixAbstractSort extends Sort implements PhoenixRel {
+ protected static final double CLIENT_MERGE_FACTOR = 0.5;
+
+ public PhoenixAbstractSort(RelOptCluster cluster, RelTraitSet traits, RelNode child, RelCollation collation, RexNode offset, RexNode fetch) {
+ super(cluster, traits, child, collation, offset, fetch);
+ assert getConvention() == PhoenixRel.CONVENTION;
+ }
+
+ protected OrderBy getOrderBy(Implementor implementor, TupleProjector tupleProjector) {
+ assert !getCollation().getFieldCollations().isEmpty();
+
+ List<OrderByExpression> orderByExpressions = Lists.newArrayList();
+ for (RelFieldCollation fieldCollation : getCollation().getFieldCollations()) {
+ Expression expr = tupleProjector == null ?
+ implementor.newColumnExpression(fieldCollation.getFieldIndex())
+ : tupleProjector.getExpressions()[fieldCollation.getFieldIndex()];
+ boolean isAscending = fieldCollation.getDirection() == Direction.ASCENDING;
+ if (expr.getSortOrder() == SortOrder.DESC) {
+ isAscending = !isAscending;
+ }
+ orderByExpressions.add(new OrderByExpression(expr, fieldCollation.nullDirection == NullDirection.LAST, isAscending));
+ }
+
+ return new OrderBy(orderByExpressions);
+ }
+
+ protected Integer getLimit(Implementor implementor) {
+ if (this.fetch == null)
+ return null;
+
+ Expression expr = CalciteUtils.toExpression(this.fetch, implementor);
+ if (!expr.isStateless())
+ throw new UnsupportedOperationException("Stateful limit expression not supported");
+
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ expr.evaluate(null, ptr);
+ return ((Number) (expr.getDataType().toObject(ptr))).intValue();
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java
new file mode 100644
index 0000000..d66294b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java
@@ -0,0 +1,69 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.FromCompiler;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.SequenceManager;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.ClientAggregatePlan;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.schema.TableRef;
+
+public class PhoenixClientAggregate extends PhoenixAbstractAggregate {
+
+ public PhoenixClientAggregate(RelOptCluster cluster, RelTraitSet traits,
+ RelNode child, boolean indicator, ImmutableBitSet groupSet,
+ List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+ super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls);
+ }
+
+ @Override
+ public PhoenixClientAggregate copy(RelTraitSet traits, RelNode input,
+ boolean indicator, ImmutableBitSet groupSet,
+ List<ImmutableBitSet> groupSets, List<AggregateCall> aggregateCalls) {
+ return new PhoenixClientAggregate(getCluster(), traits, input, indicator, groupSet, groupSets, aggregateCalls);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return super.computeSelfCost(planner)
+ .multiplyBy(PHOENIX_FACTOR);
+ }
+
+ @Override
+ public QueryPlan implement(Implementor implementor) {
+ assert getConvention() == getInput().getConvention();
+
+ QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+
+ TableRef tableRef = implementor.getTableRef();
+ PhoenixStatement stmt = plan.getContext().getStatement();
+ StatementContext context;
+ try {
+ context = new StatementContext(stmt, FromCompiler.getResolver(tableRef), new Scan(), new SequenceManager(stmt));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ GroupBy groupBy = super.getGroupBy(implementor);
+ super.serializeAggregators(implementor, context, groupBy.isEmpty());
+
+ QueryPlan aggPlan = new ClientAggregatePlan(context, plan.getStatement(), tableRef, RowProjector.EMPTY_PROJECTOR, null, null, OrderBy.EMPTY_ORDER_BY, groupBy, null, plan);
+
+ return PhoenixAbstractAggregate.wrapWithProject(implementor, aggPlan, groupBy.getKeyExpressions(), Arrays.asList(context.getAggregationManager().getAggregators().getFunctions()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
new file mode 100644
index 0000000..4c7c6b9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
@@ -0,0 +1,54 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.util.Set;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.phoenix.compile.QueryPlan;
+
+import com.google.common.collect.ImmutableSet;
+
+public class PhoenixClientJoin extends PhoenixAbstractJoin {
+
+ public PhoenixClientJoin(RelOptCluster cluster, RelTraitSet traits,
+ RelNode left, RelNode right, RexNode condition,
+ JoinRelType joinType, Set<String> variablesStopped) {
+ super(cluster, traits, left, right, condition, joinType,
+ variablesStopped);
+ }
+
+ @Override
+ public PhoenixClientJoin copy(RelTraitSet traits, RexNode condition, RelNode left,
+ RelNode right, JoinRelType joinRelType, boolean semiJoinDone) {
+ return new PhoenixClientJoin(getCluster(), traits, left, right, condition, joinRelType, ImmutableSet.<String>of());
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ double rowCount = RelMetadataQuery.getRowCount(this);
+
+ for (RelNode input : getInputs()) {
+ double inputRowCount = input.getRows();
+ if (Double.isInfinite(inputRowCount)) {
+ rowCount = inputRowCount;
+ } else {
+ rowCount += inputRowCount;
+ }
+ }
+ RelOptCost cost = planner.getCostFactory().makeCost(rowCount, 0, 0);
+
+ return cost.multiplyBy(PHOENIX_FACTOR);
+ }
+
+ @Override
+ public QueryPlan implement(Implementor implementor) {
+ throw new UnsupportedOperationException();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java
new file mode 100644
index 0000000..2557b43
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java
@@ -0,0 +1,45 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.execute.TupleProjectionPlan;
+import org.apache.phoenix.execute.TupleProjector;
+
+public class PhoenixClientProject extends PhoenixAbstractProject {
+
+ public PhoenixClientProject(RelOptCluster cluster, RelTraitSet traits,
+ RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
+ super(cluster, traits, input, projects, rowType);
+ }
+
+ @Override
+ public PhoenixClientProject copy(RelTraitSet traits, RelNode input,
+ List<RexNode> projects, RelDataType rowType) {
+ return new PhoenixClientProject(getCluster(), traits, input, projects, rowType);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return super.computeSelfCost(planner)
+ .multiplyBy(PHOENIX_FACTOR);
+ }
+
+ @Override
+ public QueryPlan implement(Implementor implementor) {
+ assert getConvention() == getInput().getConvention();
+
+ QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+ TupleProjector tupleProjector = project(implementor);
+
+ return new TupleProjectionPlan(plan, tupleProjector, null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
new file mode 100644
index 0000000..a36d9d0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
@@ -0,0 +1,65 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.sql.SQLException;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.FromCompiler;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.SequenceManager;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.ClientScanPlan;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.schema.TableRef;
+
+public class PhoenixClientSort extends PhoenixAbstractSort {
+
+ public PhoenixClientSort(RelOptCluster cluster, RelTraitSet traits,
+ RelNode child, RelCollation collation, RexNode offset, RexNode fetch) {
+ super(cluster, traits, child, collation, offset, fetch);
+ }
+
+ @Override
+ public PhoenixClientSort copy(RelTraitSet traitSet, RelNode newInput,
+ RelCollation newCollation, RexNode offset, RexNode fetch) {
+ return new PhoenixClientSort(getCluster(), traitSet, newInput, newCollation, offset, fetch);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return super.computeSelfCost(planner)
+ .multiplyBy(PHOENIX_FACTOR);
+ }
+
+ @Override
+ public QueryPlan implement(Implementor implementor) {
+ assert getConvention() == getInput().getConvention();
+ if (this.offset != null)
+ throw new UnsupportedOperationException();
+
+ QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+
+ TableRef tableRef = implementor.getTableRef();
+ PhoenixStatement stmt = plan.getContext().getStatement();
+ StatementContext context;
+ try {
+ context = new StatementContext(stmt, FromCompiler.getResolver(tableRef), new Scan(), new SequenceManager(stmt));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+
+ OrderBy orderBy = super.getOrderBy(implementor, null);
+ Integer limit = super.getLimit(implementor);
+
+ return new ClientScanPlan(context, plan.getStatement(), tableRef, RowProjector.EMPTY_PROJECTOR, limit, null, orderBy, plan);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java
new file mode 100644
index 0000000..6e93905
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java
@@ -0,0 +1,79 @@
+package org.apache.phoenix.calcite.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.TupleProjectionPlan;
+import org.apache.phoenix.execute.TupleProjector;
+
+public class PhoenixCompactClientSort extends PhoenixAbstractSort {
+
+ public PhoenixCompactClientSort(RelOptCluster cluster, RelTraitSet traits,
+ RelNode child, RelCollation collation, RexNode offset, RexNode fetch) {
+ super(cluster, traits, child, collation, offset, fetch);
+ }
+
+ @Override
+ public PhoenixCompactClientSort copy(RelTraitSet traitSet, RelNode newInput,
+ RelCollation newCollation, RexNode offset, RexNode fetch) {
+ return new PhoenixCompactClientSort(getCluster(), traitSet, newInput, newCollation, offset, fetch);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return super.computeSelfCost(planner)
+ .multiplyBy(CLIENT_MERGE_FACTOR)
+ .multiplyBy(PHOENIX_FACTOR);
+ }
+
+ @Override
+ public QueryPlan implement(Implementor implementor) {
+ assert getConvention() == getInput().getConvention();
+ if (this.offset != null)
+ throw new UnsupportedOperationException();
+
+ QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+ assert plan instanceof TupleProjectionPlan;
+
+ // PhoenixServerAggregate wraps the AggregatePlan with a TupleProjectionPlan,
+ // so we need to unwrap the TupleProjectionPlan.
+ TupleProjectionPlan tupleProjectionPlan = (TupleProjectionPlan) plan;
+ assert tupleProjectionPlan.getPostFilter() == null;
+ QueryPlan innerPlan = tupleProjectionPlan.getDelegate();
+ TupleProjector tupleProjector = tupleProjectionPlan.getTupleProjector();
+ assert (innerPlan instanceof AggregatePlan
+ || innerPlan instanceof HashJoinPlan)
+ && innerPlan.getLimit() == null;
+
+ AggregatePlan basePlan;
+ HashJoinPlan hashJoinPlan = null;
+ if (innerPlan instanceof AggregatePlan) {
+ basePlan = (AggregatePlan) innerPlan;
+ } else {
+ hashJoinPlan = (HashJoinPlan) innerPlan;
+ QueryPlan delegate = hashJoinPlan.getDelegate();
+ assert delegate instanceof AggregatePlan;
+ basePlan = (AggregatePlan) delegate;
+ }
+
+ OrderBy orderBy = super.getOrderBy(implementor, tupleProjector);
+ Integer limit = super.getLimit(implementor);
+ QueryPlan newPlan = AggregatePlan.create((AggregatePlan) basePlan, orderBy, limit);
+
+ if (hashJoinPlan != null) {
+ newPlan = HashJoinPlan.create(hashJoinPlan.getStatement(), newPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
+ }
+ // Recover the wrapping of TupleProjectionPlan
+ newPlan = new TupleProjectionPlan(newPlan, tupleProjector, null);
+ return newPlan;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java
new file mode 100644
index 0000000..0827d74
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java
@@ -0,0 +1,42 @@
+package org.apache.phoenix.calcite.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rex.RexNode;
+import org.apache.phoenix.calcite.CalciteUtils;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.execute.ClientScanPlan;
+import org.apache.phoenix.expression.Expression;
+
+/**
+ * Implementation of {@link org.apache.calcite.rel.core.Filter}
+ * relational expression in Phoenix.
+ */
+public class PhoenixFilter extends Filter implements PhoenixRel {
+ public PhoenixFilter(RelOptCluster cluster, RelTraitSet traits, RelNode input, RexNode condition) {
+ super(cluster, traits, input, condition);
+ assert getConvention() == PhoenixRel.CONVENTION;
+ }
+
+ public PhoenixFilter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
+ return new PhoenixFilter(getCluster(), traitSet, input, condition);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return super.computeSelfCost(planner).multiplyBy(PHOENIX_FACTOR);
+ }
+
+ public QueryPlan implement(Implementor implementor) {
+ assert getConvention() == getInput().getConvention();
+ QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+ Expression expr = CalciteUtils.toExpression(condition, implementor);
+ return new ClientScanPlan(plan.getContext(), plan.getStatement(), plan.getTableRef(),
+ plan.getProjector(), null, expr, OrderBy.EMPTY_ORDER_BY, plan);
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixJoin.java
new file mode 100644
index 0000000..142fb35
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixJoin.java
@@ -0,0 +1,42 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.util.Set;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.phoenix.compile.QueryPlan;
+
+import com.google.common.collect.ImmutableSet;
+
+public class PhoenixJoin extends Join implements PhoenixRel {
+
+ public PhoenixJoin(RelOptCluster cluster, RelTraitSet traits, RelNode left,
+ RelNode right, RexNode condition, JoinRelType joinType,
+ Set<String> variablesStopped) {
+ super(cluster, traits, left, right, condition, joinType,
+ variablesStopped);
+ }
+
+ @Override
+ public Join copy(RelTraitSet traits, RexNode condition, RelNode left,
+ RelNode right, JoinRelType joinRelType, boolean semiJoinDone) {
+ return new PhoenixJoin(getCluster(), traits, left, right, condition, joinRelType, ImmutableSet.<String>of());
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return planner.getCostFactory().makeCost(Double.POSITIVE_INFINITY, 0, 0);
+ }
+
+ @Override
+ public QueryPlan implement(Implementor implementor) {
+ throw new UnsupportedOperationException();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
new file mode 100644
index 0000000..c7cc60d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
@@ -0,0 +1,73 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.util.List;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.execute.TupleProjector;
+import org.apache.phoenix.expression.ColumnExpression;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableRef;
+
+/**
+ * Relational expression in Phoenix.
+ *
+ * <p>Phoenix evaluates relational expressions using {@link java.util.Iterator}s
+ * over streams of {@link org.apache.phoenix.schema.tuple.Tuple}s.</p>
+ */
+public interface PhoenixRel extends RelNode {
+ /** Calling convention for relational operations that occur in Phoenix. */
+ Convention CONVENTION = new Convention.Impl("PHOENIX", PhoenixRel.class);
+
+ /** Relative cost of Phoenix versus Enumerable convention.
+ *
+ * <p>Multiply by the value (which is less than unity), and you will get a cheaper cost.
+ * Phoenix is cheaper.
+ */
+ double PHOENIX_FACTOR = 0.5;
+
+ /** Relative cost of server plan versus client plan.
+ *
+ * <p>Multiply by the value (which is less than unity), and you will get a cheaper cost.
+ * Server is cheaper.
+ */
+ double SERVER_FACTOR = 0.2;
+
+ QueryPlan implement(Implementor implementor);
+
+ class ImplementorContext {
+ private boolean retainPKColumns;
+ private boolean forceProject;
+
+ public ImplementorContext(boolean retainPKColumns, boolean forceProject) {
+ this.retainPKColumns = retainPKColumns;
+ this.forceProject = forceProject;
+ }
+
+ public boolean isRetainPKColumns() {
+ return this.retainPKColumns;
+ }
+
+ public boolean forceProject() {
+ return this.forceProject;
+ }
+ }
+
+ /** Holds context for an traversal over a tree of relational expressions
+ * to convert it to an executable plan. */
+ interface Implementor {
+ QueryPlan visitInput(int i, PhoenixRel input);
+ ColumnExpression newColumnExpression(int index);
+ void setTableRef(TableRef tableRef);
+ TableRef getTableRef();
+ void pushContext(ImplementorContext context);
+ ImplementorContext popContext();
+ ImplementorContext getCurrentContext();
+ PTable createProjectedTable();
+ RowProjector createRowProjector();
+ TupleProjector project(List<Expression> exprs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
new file mode 100644
index 0000000..2ae3838
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
@@ -0,0 +1,131 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Stack;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.calcite.CalciteUtils;
+import org.apache.phoenix.calcite.rel.PhoenixRel.ImplementorContext;
+import org.apache.phoenix.compile.ColumnProjector;
+import org.apache.phoenix.compile.ExpressionProjector;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.TupleProjectionCompiler;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.execute.TupleProjector;
+import org.apache.phoenix.expression.ColumnExpression;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.ParseNodeFactory;
+import org.apache.phoenix.schema.ColumnRef;
+import org.apache.phoenix.schema.KeyValueSchema;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnImpl;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.PNameFactory;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableRef;
+
+import com.google.common.collect.Lists;
+
+public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor {
+ private TableRef tableRef;
+ private Stack<ImplementorContext> contextStack;
+
+ public PhoenixRelImplementorImpl() {
+ this.contextStack = new Stack<ImplementorContext>();
+ pushContext(new ImplementorContext(true, false));
+ }
+
+ @Override
+ public QueryPlan visitInput(int i, PhoenixRel input) {
+ return input.implement(this);
+ }
+
+ @Override
+ public ColumnExpression newColumnExpression(int index) {
+ ColumnRef colRef = new ColumnRef(this.tableRef, index);
+ return colRef.newColumnExpression();
+ }
+
+
+ @Override
+ public void setTableRef(TableRef tableRef) {
+ this.tableRef = tableRef;
+ }
+
+ @Override
+ public TableRef getTableRef() {
+ return this.tableRef;
+ }
+
+ @Override
+ public void pushContext(ImplementorContext context) {
+ this.contextStack.push(context);
+ }
+
+ @Override
+ public ImplementorContext popContext() {
+ return contextStack.pop();
+ }
+
+ @Override
+ public ImplementorContext getCurrentContext() {
+ return contextStack.peek();
+ }
+
+ @Override
+ public PTable createProjectedTable() {
+ List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList();
+ for (PColumn column : getTableRef().getTable().getColumns()) {
+ sourceColumnRefs.add(new ColumnRef(getTableRef(), column.getPosition()));
+ }
+
+ try {
+ return TupleProjectionCompiler.createProjectedTable(getTableRef(), sourceColumnRefs, getCurrentContext().isRetainPKColumns());
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public RowProjector createRowProjector() {
+ List<ColumnProjector> columnProjectors = Lists.<ColumnProjector>newArrayList();
+ for (PColumn column : getTableRef().getTable().getColumns()) {
+ Expression expr = newColumnExpression(column.getPosition());
+ columnProjectors.add(new ExpressionProjector(column.getName().getString(), getTableRef().getTable().getName().getString(), expr, false));
+ }
+ // TODO get estimate row size
+ return new RowProjector(columnProjectors, 0, false);
+ }
+
+ @Override
+ public TupleProjector project(List<Expression> exprs) {
+ KeyValueSchema.KeyValueSchemaBuilder builder = new KeyValueSchema.KeyValueSchemaBuilder(0);
+ List<PColumn> columns = Lists.<PColumn>newArrayList();
+ for (int i = 0; i < exprs.size(); i++) {
+ String name = ParseNodeFactory.createTempAlias();
+ Expression expr = exprs.get(i);
+ builder.addField(expr);
+ columns.add(new PColumnImpl(PNameFactory.newName(name), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY),
+ expr.getDataType(), expr.getMaxLength(), expr.getScale(), expr.isNullable(),
+ i, expr.getSortOrder(), null, null, false, name));
+ }
+ try {
+ PTable pTable = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME,
+ PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM,
+ null, null, columns, null, null, Collections.<PTable>emptyList(),
+ false, Collections.<PName>emptyList(), null, null, false, false, false, null,
+ null, null);
+ this.setTableRef(new TableRef(CalciteUtils.createTempAlias(), pTable, HConstants.LATEST_TIMESTAMP, false));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+
+ return new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()]));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java
new file mode 100644
index 0000000..3511699
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java
@@ -0,0 +1,74 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.execute.ScanPlan;
+
+public class PhoenixServerAggregate extends PhoenixAbstractAggregate {
+
+ public PhoenixServerAggregate(RelOptCluster cluster, RelTraitSet traits,
+ RelNode child, boolean indicator, ImmutableBitSet groupSet,
+ List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
+ super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls);
+ }
+
+ @Override
+ public PhoenixServerAggregate copy(RelTraitSet traits, RelNode input, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggregateCalls) {
+ return new PhoenixServerAggregate(getCluster(), traits, input, indicator, groupSet, groupSets, aggregateCalls);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return super.computeSelfCost(planner)
+ .multiplyBy(SERVER_FACTOR)
+ .multiplyBy(PHOENIX_FACTOR);
+ }
+
+ @Override
+ public QueryPlan implement(Implementor implementor) {
+ assert getConvention() == getInput().getConvention();
+
+ QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+ assert (plan instanceof ScanPlan
+ || plan instanceof HashJoinPlan)
+ && plan.getLimit() == null;
+
+ ScanPlan basePlan;
+ HashJoinPlan hashJoinPlan = null;
+ if (plan instanceof ScanPlan) {
+ basePlan = (ScanPlan) plan;
+ } else {
+ hashJoinPlan = (HashJoinPlan) plan;
+ QueryPlan delegate = hashJoinPlan.getDelegate();
+ assert delegate instanceof ScanPlan;
+ basePlan = (ScanPlan) delegate;
+ }
+
+ StatementContext context = basePlan.getContext();
+ GroupBy groupBy = super.getGroupBy(implementor);
+ super.serializeAggregators(implementor, context, groupBy.isEmpty());
+
+ QueryPlan aggPlan = new AggregatePlan(context, basePlan.getStatement(), basePlan.getTableRef(), RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null, groupBy, null);
+ if (hashJoinPlan != null) {
+ aggPlan = HashJoinPlan.create(hashJoinPlan.getStatement(), aggPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
+ }
+
+ return PhoenixAbstractAggregate.wrapWithProject(implementor, aggPlan, groupBy.getKeyExpressions(), Arrays.asList(context.getAggregationManager().getAggregators().getFunctions()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
new file mode 100644
index 0000000..8a4811a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
@@ -0,0 +1,125 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Util;
+import org.apache.phoenix.calcite.CalciteUtils;
+import org.apache.phoenix.compile.JoinCompiler;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.parse.JoinTableNode.JoinType;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableRef;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
+public class PhoenixServerJoin extends PhoenixAbstractJoin {
+
+ public PhoenixServerJoin(RelOptCluster cluster, RelTraitSet traits,
+ RelNode left, RelNode right, RexNode condition,
+ JoinRelType joinType, Set<String> variablesStopped) {
+ super(cluster, traits, left, right, condition, joinType,
+ variablesStopped);
+ }
+
+ @Override
+ public PhoenixServerJoin copy(RelTraitSet traits, RexNode condition, RelNode left,
+ RelNode right, JoinRelType joinRelType, boolean semiJoinDone) {
+ return new PhoenixServerJoin(getCluster(), traits, left, right, condition, joinRelType, ImmutableSet.<String>of());
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ //TODO return infinite cost if RHS size exceeds memory limit.
+
+ double rowCount = RelMetadataQuery.getRowCount(this);
+
+ for (RelNode input : getInputs()) {
+ double inputRowCount = input.getRows();
+ if (Double.isInfinite(inputRowCount)) {
+ rowCount = inputRowCount;
+ } else if (input == getLeft()) {
+ rowCount += inputRowCount;
+ } else {
+ rowCount += Util.nLogN(inputRowCount);
+ }
+ }
+ RelOptCost cost = planner.getCostFactory().makeCost(rowCount, 0, 0);
+
+ return cost.multiplyBy(SERVER_FACTOR).multiplyBy(PHOENIX_FACTOR);
+ }
+
+ @Override
+ public QueryPlan implement(Implementor implementor) {
+ assert getLeft().getConvention() == PhoenixRel.CONVENTION;
+ assert getRight().getConvention() == PhoenixRel.CONVENTION;
+ PhoenixRel left = (PhoenixRel) getLeft();
+ PhoenixRel right = (PhoenixRel) getRight();
+
+ JoinInfo joinInfo = JoinInfo.of(left, right, getCondition());
+ List<Expression> leftExprs = Lists.<Expression> newArrayList();
+ List<Expression> rightExprs = Lists.<Expression> newArrayList();
+ implementor.pushContext(new ImplementorContext(implementor.getCurrentContext().isRetainPKColumns(), true));
+ QueryPlan leftPlan = implementor.visitInput(0, left);
+ PTable leftTable = implementor.getTableRef().getTable();
+ for (Iterator<Integer> iter = joinInfo.leftKeys.iterator(); iter.hasNext();) {
+ Integer index = iter.next();
+ leftExprs.add(implementor.newColumnExpression(index));
+ }
+ if (leftExprs.isEmpty()) {
+ leftExprs.add(LiteralExpression.newConstant(0));
+ }
+ implementor.popContext();
+ implementor.pushContext(new ImplementorContext(false, true));
+ QueryPlan rightPlan = implementor.visitInput(1, right);
+ PTable rightTable = implementor.getTableRef().getTable();
+ for (Iterator<Integer> iter = joinInfo.rightKeys.iterator(); iter.hasNext();) {
+ Integer index = iter.next();
+ rightExprs.add(implementor.newColumnExpression(index));
+ }
+ if (rightExprs.isEmpty()) {
+ rightExprs.add(LiteralExpression.newConstant(0));
+ }
+ implementor.popContext();
+
+ JoinType type = convertJoinType(getJoinType());
+ PTable joinedTable;
+ try {
+ joinedTable = JoinCompiler.joinProjectedTables(leftTable, rightTable, type);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ implementor.setTableRef(new TableRef(joinedTable));
+ RexNode postFilter = joinInfo.getRemaining(getCluster().getRexBuilder());
+ Expression postFilterExpr = postFilter.isAlwaysTrue() ? null : CalciteUtils.toExpression(postFilter, implementor);
+ @SuppressWarnings("unchecked")
+ HashJoinInfo hashJoinInfo = new HashJoinInfo(
+ joinedTable, new ImmutableBytesPtr[] {new ImmutableBytesPtr()},
+ (List<Expression>[]) (new List[] {leftExprs}),
+ new JoinType[] {type}, new boolean[] {true},
+ new PTable[] {rightTable},
+ new int[] {leftTable.getColumns().size() - leftTable.getPKColumns().size()},
+ postFilterExpr, null);
+
+ return HashJoinPlan.create(SelectStatement.SELECT_STAR, leftPlan, hashJoinInfo, new HashJoinPlan.HashSubPlan[] {new HashJoinPlan.HashSubPlan(0, rightPlan, rightExprs, false, null, null)});
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerProject.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerProject.java
new file mode 100644
index 0000000..f9de2ee
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerProject.java
@@ -0,0 +1,51 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.execute.TupleProjector;
+
+public class PhoenixServerProject extends PhoenixAbstractProject {
+
+ public PhoenixServerProject(RelOptCluster cluster, RelTraitSet traits,
+ RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
+ super(cluster, traits, input, projects, rowType);
+ }
+
+ @Override
+ public PhoenixServerProject copy(RelTraitSet traits, RelNode input,
+ List<RexNode> projects, RelDataType rowType) {
+ return new PhoenixServerProject(getCluster(), traits, input, projects, rowType);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return super.computeSelfCost(planner)
+ .multiplyBy(SERVER_FACTOR)
+ .multiplyBy(PHOENIX_FACTOR);
+ }
+
+ @Override
+ public QueryPlan implement(Implementor implementor) {
+ assert getConvention() == getInput().getConvention();
+
+ implementor.pushContext(new ImplementorContext(implementor.getCurrentContext().isRetainPKColumns(), false));
+ QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+ implementor.popContext();
+ assert (plan instanceof ScanPlan || plan instanceof HashJoinPlan)
+ && !TupleProjector.hasProjector(plan.getContext().getScan(), plan instanceof ScanPlan);
+
+ TupleProjector tupleProjector = super.project(implementor);
+ TupleProjector.serializeProjectorIntoScan(plan.getContext().getScan(), tupleProjector, plan instanceof ScanPlan);
+ return plan;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java
new file mode 100644
index 0000000..eb4c315
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java
@@ -0,0 +1,74 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.sql.SQLException;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.execute.ScanPlan;
+
+public class PhoenixServerSort extends PhoenixAbstractSort {
+
+ public PhoenixServerSort(RelOptCluster cluster, RelTraitSet traits,
+ RelNode child, RelCollation collation, RexNode offset, RexNode fetch) {
+ super(cluster, traits, child, collation, offset, fetch);
+ }
+
+ @Override
+ public PhoenixServerSort copy(RelTraitSet traitSet, RelNode newInput,
+ RelCollation newCollation, RexNode offset, RexNode fetch) {
+ return new PhoenixServerSort(getCluster(), traitSet, newInput, newCollation, offset, fetch);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return super.computeSelfCost(planner)
+ .multiplyBy(SERVER_FACTOR)
+ .multiplyBy(PHOENIX_FACTOR);
+ }
+
+ @Override
+ public QueryPlan implement(Implementor implementor) {
+ assert getConvention() == getInput().getConvention();
+ if (this.offset != null)
+ throw new UnsupportedOperationException();
+
+ QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+ assert (plan instanceof ScanPlan
+ || plan instanceof HashJoinPlan)
+ && plan.getLimit() == null;
+
+ ScanPlan basePlan;
+ HashJoinPlan hashJoinPlan = null;
+ if (plan instanceof ScanPlan) {
+ basePlan = (ScanPlan) plan;
+ } else {
+ hashJoinPlan = (HashJoinPlan) plan;
+ QueryPlan delegate = hashJoinPlan.getDelegate();
+ assert delegate instanceof ScanPlan;
+ basePlan = (ScanPlan) delegate;
+ }
+
+ OrderBy orderBy = super.getOrderBy(implementor, null);
+ Integer limit = super.getLimit(implementor);
+ QueryPlan newPlan;
+ try {
+ newPlan = ScanPlan.create((ScanPlan) basePlan, orderBy, limit);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (hashJoinPlan != null) {
+ newPlan = HashJoinPlan.create(hashJoinPlan.getStatement(), newPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
+ }
+ return newPlan;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
new file mode 100644
index 0000000..7902e27
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
@@ -0,0 +1,166 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.calcite.CalciteUtils;
+import org.apache.phoenix.calcite.PhoenixTable;
+import org.apache.phoenix.calcite.rules.PhoenixClientJoinRule;
+import org.apache.phoenix.calcite.rules.PhoenixCompactClientSortRule;
+import org.apache.phoenix.calcite.rules.PhoenixFilterScanMergeRule;
+import org.apache.phoenix.calcite.rules.PhoenixConverterRules;
+import org.apache.phoenix.calcite.rules.PhoenixServerAggregateRule;
+import org.apache.phoenix.calcite.rules.PhoenixServerJoinRule;
+import org.apache.phoenix.calcite.rules.PhoenixServerProjectRule;
+import org.apache.phoenix.calcite.rules.PhoenixServerSortRule;
+import org.apache.phoenix.compile.ColumnResolver;
+import org.apache.phoenix.compile.FromCompiler;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.SequenceManager;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.compile.WhereCompiler;
+import org.apache.phoenix.compile.WhereOptimizer;
+import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.execute.TupleProjector;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.iterate.ParallelIteratorFactory;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnFamily;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.SchemaUtil;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Scan of a Phoenix table.
+ */
+public class PhoenixTableScan extends TableScan implements PhoenixRel {
+ public final RexNode filter;
+
+ public PhoenixTableScan(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, RexNode filter) {
+ super(cluster, traits, table);
+ this.filter = filter;
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ assert inputs.isEmpty();
+ return this;
+ }
+
+ @Override
+ public void register(RelOptPlanner planner) {
+ RelOptRule[] rules = PhoenixConverterRules.RULES;
+ for (RelOptRule rule : rules) {
+ planner.addRule(rule);
+ }
+ planner.addRule(PhoenixFilterScanMergeRule.INSTANCE);
+ planner.addRule(PhoenixServerProjectRule.PROJECT_SCAN);
+ planner.addRule(PhoenixServerProjectRule.PROJECT_SERVERJOIN);
+ planner.addRule(PhoenixServerJoinRule.JOIN_SCAN);
+ planner.addRule(PhoenixServerJoinRule.JOIN_SERVERPROJECT_SCAN);
+ planner.addRule(PhoenixServerAggregateRule.AGGREGATE_SCAN);
+ planner.addRule(PhoenixServerAggregateRule.AGGREGATE_SERVERJOIN);
+ planner.addRule(PhoenixServerAggregateRule.AGGREGATE_SERVERPROJECT);
+ planner.addRule(PhoenixServerSortRule.SORT_SCAN);
+ planner.addRule(PhoenixServerSortRule.SORT_SERVERJOIN);
+ planner.addRule(PhoenixServerSortRule.SORT_SERVERPROJECT);
+ planner.addRule(PhoenixCompactClientSortRule.SORT_SERVERAGGREGATE);
+ planner.addRule(PhoenixClientJoinRule.INSTANCE);
+ }
+
+ @Override
+ public RelWriter explainTerms(RelWriter pw) {
+ return super.explainTerms(pw)
+ .itemIf("filter", filter, filter != null);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ RelOptCost cost = super.computeSelfCost(planner).multiplyBy(PHOENIX_FACTOR);
+ if (filter != null && !filter.isAlwaysTrue()) {
+ final Double selectivity = RelMetadataQuery.getSelectivity(this, filter);
+ cost = cost.multiplyBy(selectivity);
+ }
+ return cost;
+ }
+
+ @Override
+ public double getRows() {
+ return super.getRows()
+ * RelMetadataQuery.getSelectivity(this, filter);
+ }
+
+ @Override
+ public QueryPlan implement(Implementor implementor) {
+ final PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class);
+ PTable pTable = phoenixTable.getTable();
+ TableRef tableRef = new TableRef(CalciteUtils.createTempAlias(), pTable, HConstants.LATEST_TIMESTAMP, false);
+ implementor.setTableRef(tableRef);
+ try {
+ PhoenixStatement stmt = new PhoenixStatement(phoenixTable.pc);
+ ColumnResolver resolver = FromCompiler.getResolver(tableRef);
+ StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt));
+ SelectStatement select = SelectStatement.SELECT_STAR;
+ if (filter != null) {
+ Expression filterExpr = CalciteUtils.toExpression(filter, implementor);
+ filterExpr = WhereOptimizer.pushKeyExpressionsToScan(context, select, filterExpr);
+ WhereCompiler.setScanFilter(context, select, filterExpr, true, false);
+ }
+ projectAllColumnFamilies(context.getScan(), phoenixTable.getTable());
+ if (implementor.getCurrentContext().forceProject()) {
+ TupleProjector tupleProjector = createTupleProjector(implementor, phoenixTable.getTable());
+ TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
+ PTable projectedTable = implementor.createProjectedTable();
+ implementor.setTableRef(new TableRef(projectedTable));
+ }
+ Integer limit = null;
+ OrderBy orderBy = OrderBy.EMPTY_ORDER_BY;
+ ParallelIteratorFactory iteratorFactory = null;
+ return new ScanPlan(context, select, tableRef, RowProjector.EMPTY_PROJECTOR, limit, orderBy, iteratorFactory, true);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private TupleProjector createTupleProjector(Implementor implementor, PTable table) {
+ KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+ List<Expression> exprs = Lists.<Expression> newArrayList();
+ for (PColumn column : table.getColumns()) {
+ if (!SchemaUtil.isPKColumn(column) || !implementor.getCurrentContext().isRetainPKColumns()) {
+ Expression expr = implementor.newColumnExpression(column.getPosition());
+ exprs.add(expr);
+ builder.addField(expr);
+ }
+ }
+
+ return new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()]));
+ }
+
+ // TODO only project needed columns
+ private void projectAllColumnFamilies(Scan scan, PTable table) {
+ scan.getFamilyMap().clear();
+ for (PColumnFamily family : table.getColumnFamilies()) {
+ scan.addFamily(family.getName().getBytes());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
new file mode 100644
index 0000000..3916102
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
@@ -0,0 +1,102 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.calcite.adapter.enumerable.EnumerableRel;
+import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
+import org.apache.calcite.adapter.enumerable.JavaRowFormat;
+import org.apache.calcite.adapter.enumerable.PhysType;
+import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
+import org.apache.calcite.linq4j.tree.BlockBuilder;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.MethodCallExpression;
+import org.apache.calcite.linq4j.tree.ParameterExpression;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterImpl;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.phoenix.calcite.BuiltInMethod;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.execute.DelegateQueryPlan;
+import org.apache.phoenix.iterate.ResultIterator;
+
+/**
+ * Scan of a Phoenix table.
+ */
+public class PhoenixToEnumerableConverter extends ConverterImpl implements EnumerableRel {
+ public PhoenixToEnumerableConverter(
+ RelOptCluster cluster,
+ RelTraitSet traits,
+ RelNode input) {
+ super(cluster, ConventionTraitDef.INSTANCE, traits, input);
+ }
+
+ @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new PhoenixToEnumerableConverter(getCluster(), traitSet, sole(inputs));
+ }
+
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return super.computeSelfCost(planner).multiplyBy(.1);
+ }
+
+ @Override
+ public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
+ // Generates code that instantiates a result iterator, then converts it
+ // to an enumerable.
+ //
+ // ResultIterator iterator = root.get("x");
+ // return CalciteRuntime.toEnumerable(iterator);
+ final BlockBuilder list = new BlockBuilder();
+ QueryPlan plan = makePlan((PhoenixRel)getInput());
+ Expression var = stash(implementor, plan, QueryPlan.class);
+ final RelDataType rowType = getRowType();
+ final PhysType physType =
+ PhysTypeImpl.of(
+ implementor.getTypeFactory(), rowType,
+ pref.prefer(JavaRowFormat.ARRAY));
+ final Expression iterator_ =
+ list.append("iterator", var);
+ final Expression enumerable_ =
+ list.append("enumerable",
+ Expressions.call(BuiltInMethod.TO_ENUMERABLE.method,
+ iterator_));
+ list.add(Expressions.return_(null, enumerable_));
+ return implementor.result(physType, list.toBlock());
+ }
+
+ static QueryPlan makePlan(PhoenixRel rel) {
+ final PhoenixRel.Implementor phoenixImplementor = new PhoenixRelImplementorImpl();
+ final QueryPlan plan = phoenixImplementor.visitInput(0, rel);
+ return new DelegateQueryPlan(plan) {
+ @Override
+ public ResultIterator iterator() throws SQLException {
+ return delegate.iterator();
+ }
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ return delegate.getExplainPlan();
+ }
+ @Override
+ public RowProjector getProjector() {
+ return phoenixImplementor.createRowProjector();
+ }
+ };
+ }
+
+ static Expression stash(EnumerableRelImplementor implementor, Object o, Class clazz) {
+ ParameterExpression x = (ParameterExpression) implementor.stash(o, clazz);
+ MethodCallExpression e =
+ Expressions.call(implementor.getRootExpression(),
+ org.apache.calcite.util.BuiltInMethod.DATA_CONTEXT_GET.method,
+ Expressions.constant(x.name));
+ return Expressions.convert_(e, clazz);
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java
new file mode 100644
index 0000000..787b2f1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java
@@ -0,0 +1,37 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.util.List;
+
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Union;
+import org.apache.phoenix.compile.QueryPlan;
+
+/**
+ * Implementation of {@link org.apache.calcite.rel.core.Union}
+ * relational expression in Phoenix.
+ */
+public class PhoenixUnion extends Union implements PhoenixRel {
+ public PhoenixUnion(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) {
+ super(cluster, traits, inputs, all);
+ assert getConvention() == PhoenixRel.CONVENTION;
+ }
+
+ @Override
+ public PhoenixUnion copy(RelTraitSet traits, List<RelNode> inputs, boolean all) {
+ return new PhoenixUnion(getCluster(), traits, inputs, all);
+ }
+
+ @Override
+ public QueryPlan implement(Implementor implementor) {
+ for (RelNode input : getInputs()) {
+ assert getConvention() == input.getConvention();
+ }
+ for (Ord<RelNode> input : Ord.zip(inputs)) {
+ implementor.visitInput(input.i, (PhoenixRel) input.e);
+ }
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
new file mode 100644
index 0000000..f1a626b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
@@ -0,0 +1,37 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.util.List;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.phoenix.compile.QueryPlan;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Implementation of {@link org.apache.calcite.rel.core.Values}
+ * relational expression in Phoenix.
+ */
+public class PhoenixValues extends Values implements PhoenixRel {
+ public PhoenixValues(RelOptCluster cluster, RelDataType rowType, ImmutableList<ImmutableList<RexLiteral>> tuples, RelTraitSet traits) {
+ super(cluster, rowType, tuples, traits);
+ assert getConvention() == PhoenixRel.CONVENTION;
+ }
+
+ @Override
+ public PhoenixValues copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ assert traitSet.containsIfApplicable(Convention.NONE);
+ assert inputs.isEmpty();
+ return new PhoenixValues(getCluster(), rowType, tuples, traitSet);
+ }
+
+ @Override
+ public QueryPlan implement(Implementor implementor) {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixClientJoinRule.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixClientJoinRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixClientJoinRule.java
new file mode 100644
index 0000000..99ba81f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixClientJoinRule.java
@@ -0,0 +1,55 @@
+package org.apache.phoenix.calcite.rules;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.phoenix.calcite.rel.PhoenixClientJoin;
+import org.apache.phoenix.calcite.rel.PhoenixClientSort;
+import org.apache.phoenix.calcite.rel.PhoenixJoin;
+import org.apache.phoenix.calcite.rel.PhoenixRel;
+import com.google.common.collect.Lists;
+
+public class PhoenixClientJoinRule extends RelOptRule {
+
+ public static PhoenixClientJoinRule INSTANCE = new PhoenixClientJoinRule();
+
+ public PhoenixClientJoinRule() {
+ super(operand(PhoenixJoin.class, any()), "PhoenixClientJoinRule");
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ PhoenixJoin join = call.rel(0);
+ RelNode left = join.getLeft();
+ RelNode right = join.getRight();
+ JoinInfo joinInfo = JoinInfo.of(left, right, join.getCondition());
+
+ List<RelFieldCollation> leftFieldCollations = Lists.newArrayList();
+ for (Iterator<Integer> iter = joinInfo.leftKeys.iterator(); iter.hasNext();) {
+ leftFieldCollations.add(new RelFieldCollation(iter.next(), Direction.ASCENDING,NullDirection.FIRST));
+ }
+ RelCollation leftCollation = RelCollations.of(leftFieldCollations);
+ RelNode newLeft = new PhoenixClientSort(left.getCluster(), left.getTraitSet().replace(PhoenixRel.CONVENTION).replace(leftCollation), left, leftCollation, null, null);
+
+ List<RelFieldCollation> rightFieldCollations = Lists.newArrayList();
+ for (Iterator<Integer> iter = joinInfo.rightKeys.iterator(); iter.hasNext();) {
+ rightFieldCollations.add(new RelFieldCollation(iter.next(), Direction.ASCENDING,NullDirection.FIRST));
+ }
+ RelCollation rightCollation = RelCollations.of(rightFieldCollations);
+ RelNode newRight = new PhoenixClientSort(right.getCluster(), right.getTraitSet().replace(PhoenixRel.CONVENTION).replace(rightCollation), right, rightCollation, null, null);
+
+ call.transformTo(new PhoenixClientJoin(join.getCluster(),
+ join.getTraitSet(), newLeft, newRight, join.getCondition(),
+ join.getJoinType(), join.getVariablesStopped()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixCompactClientSortRule.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixCompactClientSortRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixCompactClientSortRule.java
index 63cd60e..d1f4ec7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixCompactClientSortRule.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixCompactClientSortRule.java
@@ -2,10 +2,10 @@ package org.apache.phoenix.calcite.rules;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.phoenix.calcite.PhoenixClientSort;
-import org.apache.phoenix.calcite.PhoenixCompactClientSort;
-import org.apache.phoenix.calcite.PhoenixRel;
-import org.apache.phoenix.calcite.PhoenixServerAggregate;
+import org.apache.phoenix.calcite.rel.PhoenixClientSort;
+import org.apache.phoenix.calcite.rel.PhoenixCompactClientSort;
+import org.apache.phoenix.calcite.rel.PhoenixRel;
+import org.apache.phoenix.calcite.rel.PhoenixServerAggregate;
public class PhoenixCompactClientSortRule extends RelOptRule {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
index e426637..c6a5d36 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
@@ -11,17 +11,19 @@ import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.logical.LogicalUnion;
import org.apache.calcite.util.trace.CalciteTrace;
-import org.apache.phoenix.calcite.PhoenixAggregate;
-import org.apache.phoenix.calcite.PhoenixClientAggregate;
-import org.apache.phoenix.calcite.PhoenixClientJoin;
-import org.apache.phoenix.calcite.PhoenixClientProject;
-import org.apache.phoenix.calcite.PhoenixClientSort;
-import org.apache.phoenix.calcite.PhoenixFilter;
-import org.apache.phoenix.calcite.PhoenixProject;
-import org.apache.phoenix.calcite.PhoenixRel;
-import org.apache.phoenix.calcite.PhoenixSort;
-import org.apache.phoenix.calcite.PhoenixToEnumerableConverter;
-import org.apache.phoenix.calcite.PhoenixUnion;
+import org.apache.phoenix.calcite.rel.PhoenixAbstractAggregate;
+import org.apache.phoenix.calcite.rel.PhoenixClientAggregate;
+import org.apache.phoenix.calcite.rel.PhoenixClientProject;
+import org.apache.phoenix.calcite.rel.PhoenixClientSort;
+import org.apache.phoenix.calcite.rel.PhoenixFilter;
+import org.apache.phoenix.calcite.rel.PhoenixAbstractProject;
+import org.apache.phoenix.calcite.rel.PhoenixJoin;
+import org.apache.phoenix.calcite.rel.PhoenixRel;
+import org.apache.phoenix.calcite.rel.PhoenixAbstractSort;
+import org.apache.phoenix.calcite.rel.PhoenixToEnumerableConverter;
+import org.apache.phoenix.calcite.rel.PhoenixUnion;
+
+import com.google.common.base.Predicate;
import java.util.logging.Logger;
@@ -50,24 +52,41 @@ public class PhoenixConverterRules {
abstract static class PhoenixConverterRule extends ConverterRule {
protected final Convention out;
public PhoenixConverterRule(
- Class<? extends RelNode> clazz,
- RelTrait in,
- Convention out,
- String description) {
+ Class<? extends RelNode> clazz,
+ RelTrait in,
+ Convention out,
+ String description) {
super(clazz, in, out, description);
this.out = out;
}
+
+ public <R extends RelNode> PhoenixConverterRule(
+ Class<R> clazz,
+ Predicate<? super R> predicate,
+ RelTrait in,
+ Convention out,
+ String description) {
+ super(clazz, predicate, in, out, description);
+ this.out = out;
+ }
}
/**
* Rule to convert a {@link org.apache.calcite.rel.core.Sort} to a
- * {@link PhoenixSort}.
+ * {@link PhoenixAbstractSort}.
*/
private static class PhoenixSortRule extends PhoenixConverterRule {
+ private static Predicate<LogicalSort> NON_EMPTY_COLLATION = new Predicate<LogicalSort>() {
+ @Override
+ public boolean apply(LogicalSort input) {
+ return !input.getCollation().getFieldCollations().isEmpty();
+ }
+ };
+
public static final PhoenixSortRule INSTANCE = new PhoenixSortRule();
private PhoenixSortRule() {
- super(LogicalSort.class, Convention.NONE, PhoenixRel.CONVENTION,
+ super(LogicalSort.class, NON_EMPTY_COLLATION, Convention.NONE, PhoenixRel.CONVENTION,
"PhoenixSortRule");
}
@@ -107,7 +126,7 @@ public class PhoenixConverterRules {
/**
* Rule to convert a {@link org.apache.calcite.rel.logical.LogicalProject}
- * to a {@link PhoenixProject}.
+ * to a {@link PhoenixAbstractProject}.
*/
private static class PhoenixProjectRule extends PhoenixConverterRule {
private static final PhoenixProjectRule INSTANCE = new PhoenixProjectRule();
@@ -128,7 +147,7 @@ public class PhoenixConverterRules {
/**
* Rule to convert a {@link org.apache.calcite.rel.logical.LogicalAggregate}
- * to an {@link PhoenixAggregate}.
+ * to an {@link PhoenixAbstractAggregate}.
*/
private static class PhoenixAggregateRule extends PhoenixConverterRule {
public static final RelOptRule INSTANCE = new PhoenixAggregateRule();
@@ -175,7 +194,7 @@ public class PhoenixConverterRules {
/**
* Rule to convert a {@link org.apache.calcite.rel.core.Sort} to a
- * {@link PhoenixSort}.
+ * {@link PhoenixAbstractSort}.
*/
private static class PhoenixJoinRule extends PhoenixConverterRule {
public static final PhoenixJoinRule INSTANCE = new PhoenixJoinRule();
@@ -189,7 +208,7 @@ public class PhoenixConverterRules {
final LogicalJoin join = (LogicalJoin) rel;
final RelTraitSet traitSet =
join.getTraitSet().replace(out);
- return new PhoenixClientJoin(rel.getCluster(), traitSet,
+ return new PhoenixJoin(rel.getCluster(), traitSet,
convert(join.getLeft(), join.getLeft().getTraitSet().replace(out)),
convert(join.getRight(), join.getRight().getTraitSet().replace(out)),
join.getCondition(),
@@ -343,7 +362,7 @@ public class PhoenixConverterRules {
/**
* Rule to convert a relational expression from
- * {@link org.apache.phoenix.calcite.PhoenixRel#CONVENTION} to
+ * {@link org.apache.phoenix.calcite.rel.PhoenixRel#CONVENTION} to
* {@link org.apache.calcite.adapter.enumerable.EnumerableConvention}.
*/
public static class PhoenixToEnumerableConverterRule extends ConverterRule {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a0aca7f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java
index a456c7a..dd0f119 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java
@@ -5,7 +5,7 @@ import com.google.common.base.Predicate;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.core.Filter;
-import org.apache.phoenix.calcite.PhoenixTableScan;
+import org.apache.phoenix.calcite.rel.PhoenixTableScan;
public class PhoenixFilterScanMergeRule extends RelOptRule {