You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/04/02 21:13:56 UTC
phoenix git commit: Passed another two tests: aggregate+project,
aggregate+join
Repository: phoenix
Updated Branches:
refs/heads/calcite 8097c8b9d -> b01bdd172
Passed another two tests: aggregate+project, aggregate+join
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b01bdd17
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b01bdd17
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b01bdd17
Branch: refs/heads/calcite
Commit: b01bdd1721ca6b2425c9011bda09c339128459e4
Parents: 8097c8b
Author: maryannxue <we...@intel.com>
Authored: Thu Apr 2 15:13:47 2015 -0400
Committer: maryannxue <we...@intel.com>
Committed: Thu Apr 2 15:13:47 2015 -0400
----------------------------------------------------------------------
.../org/apache/phoenix/calcite/CalciteTest.java | 26 ++++++
.../phoenix/calcite/PhoenixAggregate.java | 84 ++++++++------------
.../org/apache/phoenix/calcite/PhoenixJoin.java | 2 +-
.../apache/phoenix/calcite/PhoenixProject.java | 33 ++------
.../org/apache/phoenix/calcite/PhoenixRel.java | 5 ++
.../calcite/PhoenixRelImplementorImpl.java | 36 +++++++++
.../phoenix/calcite/PhoenixTableScan.java | 6 +-
7 files changed, 113 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b01bdd17/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
index 70d44f6..333315c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
@@ -256,6 +256,7 @@ public class CalciteTest extends BaseClientManagedTimeIT {
{"00A323122312312", "a", "00D300000000XHP"},
{"00A423122312312", "a", "00D300000000XHP"}})
.close();
+
start().sql("select t1.entity_id, t2.a_string, t3.organization_id from aTable t1 join aTable t2 on t1.entity_id = t2.entity_id and t1.organization_id = t2.organization_id join atable t3 on t1.entity_id = t3.entity_id and t1.organization_id = t3.organization_id")
.explainIs("PhoenixToEnumerableConverter\n" +
" PhoenixProject(ENTITY_ID=[$19], A_STRING=[$2], ORGANIZATION_ID=[$36])\n" +
@@ -287,6 +288,31 @@ public class CalciteTest extends BaseClientManagedTimeIT {
{"b", 4L},
{"c", 1L}})
.close();
+
+ start().sql("select count(entity_id), a_string from atable group by a_string")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixProject(EXPR$0=[$1], A_STRING=[$0])\n" +
+ " PhoenixAggregate(group=[{0}], EXPR$0=[COUNT()])\n" +
+ " PhoenixTableScan(table=[[phoenix, ATABLE]], project=[[$2]])\n")
+ .resultIs(new Object[][] {
+ {4L, "a"},
+ {4L, "b"},
+ {1L, "c"}})
+ .close();
+
+ start().sql("select s.name, count(\"item_id\") from " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s join " + JOIN_ITEM_TABLE_FULL_NAME + " i on s.\"supplier_id\" = i.\"supplier_id\" group by s.name")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixAggregate(group=[{0}], EXPR$1=[COUNT()])\n" +
+ " PhoenixProject(NAME=[$1])\n" +
+ " PhoenixJoin(condition=[=($0, $2)], joinType=[inner])\n" +
+ " PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]], project=[[$0, $1]])\n" +
+ " PhoenixTableScan(table=[[phoenix, ITEMTABLE]], project=[[$5]])\n")
+ .resultIs(new Object[][] {
+ {"S1", 2L},
+ {"S2", 2L},
+ {"S5", 1L},
+ {"S6", 1L}})
+ .close();
}
@Test public void testSubquery() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b01bdd17/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java
index 7a38f25..0c620c8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java
@@ -7,6 +7,7 @@ import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.volcano.RelSubset;
import org.apache.calcite.rel.InvalidRelException;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
@@ -27,7 +28,8 @@ import org.apache.phoenix.execute.AggregatePlan;
import org.apache.phoenix.execute.ClientAggregatePlan;
import org.apache.phoenix.execute.HashJoinPlan;
import org.apache.phoenix.execute.ScanPlan;
-import org.apache.phoenix.expression.CoerceExpression;
+import org.apache.phoenix.execute.TupleProjectionPlan;
+import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.RowKeyColumnExpression;
import org.apache.phoenix.expression.aggregator.ClientAggregators;
@@ -36,12 +38,9 @@ import org.apache.phoenix.expression.function.AggregateFunction;
import org.apache.phoenix.expression.function.SingleAggregateFunction;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.RowKeyValueAccessor;
import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.schema.types.PDecimal;
-import org.apache.phoenix.schema.types.PVarchar;
-
import com.google.common.collect.Lists;
/**
@@ -49,6 +48,8 @@ import com.google.common.collect.Lists;
* relational expression in Phoenix.
*/
public class PhoenixAggregate extends Aggregate implements PhoenixRel {
+ private static double SERVER_AGGREGATE_FACTOR = 0.2;
+
public PhoenixAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode child, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) throws InvalidRelException {
super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls);
assert getConvention() == PhoenixRel.CONVENTION;
@@ -68,7 +69,11 @@ public class PhoenixAggregate extends Aggregate implements PhoenixRel {
@Override
public RelOptCost computeSelfCost(RelOptPlanner planner) {
- return super.computeSelfCost(planner).multiplyBy(PHOENIX_FACTOR);
+ RelOptCost cost = super.computeSelfCost(planner);
+ if (isServerAggregate()) {
+ cost = cost.multiplyBy(SERVER_AGGREGATE_FACTOR);
+ }
+ return cost.multiplyBy(PHOENIX_FACTOR);
}
@Override
@@ -117,25 +122,13 @@ public class PhoenixAggregate extends Aggregate implements PhoenixRel {
String groupExprAttribName = BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS;
// TODO sort group by keys. not sure if there is a way to avoid this sorting,
// otherwise we would have add an extra projection.
- List<Expression> exprs = Lists.newArrayListWithExpectedSize(ordinals.size());
- List<Expression> keyExprs = exprs;
+ // TODO convert key types. can be avoided?
+ List<Expression> keyExprs = Lists.newArrayListWithExpectedSize(ordinals.size());
for (int i = 0; i < ordinals.size(); i++) {
Expression expr = implementor.newColumnExpression(ordinals.get(i));
- exprs.add(expr);
- PDataType keyType = getKeyType(expr);
- if (keyType == expr.getDataType()) {
- continue;
- }
- if (keyExprs == exprs) {
- keyExprs = Lists.newArrayList(exprs);
- }
- try {
- keyExprs.set(i, CoerceExpression.create(expr, keyType));
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
+ keyExprs.add(expr);
}
- GroupBy groupBy = new GroupBy.GroupByBuilder().setScanAttribName(groupExprAttribName).setExpressions(exprs).setKeyExpressions(keyExprs).build();
+ GroupBy groupBy = new GroupBy.GroupByBuilder().setScanAttribName(groupExprAttribName).setExpressions(keyExprs).setKeyExpressions(keyExprs).build();
// TODO sort aggFuncs. same problem with group by key sorting.
List<SingleAggregateFunction> aggFuncs = Lists.newArrayList();
@@ -152,46 +145,39 @@ public class PhoenixAggregate extends Aggregate implements PhoenixRel {
context.getAggregationManager().setAggregators(clientAggregators);
SelectStatement select = SelectStatement.SELECT_STAR;
- RowProjector rowProjector = createRowProjector(keyExprs, aggFuncs);
+ QueryPlan aggPlan;
if (basePlan == null) {
- return new ClientAggregatePlan(context, select, tableRef, rowProjector, null, null, OrderBy.EMPTY_ORDER_BY, groupBy, null, plan);
+ aggPlan = new ClientAggregatePlan(context, select, tableRef, implementor.createRowProjector(), null, null, OrderBy.EMPTY_ORDER_BY, groupBy, null, plan);
+ } else {
+ aggPlan = new AggregatePlan(context, select, basePlan.getTableRef(), implementor.createRowProjector(), null, OrderBy.EMPTY_ORDER_BY, null, groupBy, null);
+ if (plan instanceof HashJoinPlan) {
+ HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
+ aggPlan = HashJoinPlan.create(select, aggPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
+ }
}
- QueryPlan aggPlan = new AggregatePlan(context, select, basePlan.getTableRef(), rowProjector, null, OrderBy.EMPTY_ORDER_BY, null, groupBy, null);
- if (plan instanceof ScanPlan)
- return aggPlan;
-
- HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
- return HashJoinPlan.create(select, aggPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
- }
-
- private static RowProjector createRowProjector(List<Expression> keyExprs, List<SingleAggregateFunction> aggFuncs) {
- List<ColumnProjector> columnProjectors = Lists.<ColumnProjector>newArrayList();
+ List<Expression> exprs = Lists.newArrayList();
for (int i = 0; i < keyExprs.size(); i++) {
Expression keyExpr = keyExprs.get(i);
RowKeyValueAccessor accessor = new RowKeyValueAccessor(keyExprs, i);
Expression expr = new RowKeyColumnExpression(keyExpr, accessor, keyExpr.getDataType());
- columnProjectors.add(new ExpressionProjector(expr.toString(), "", expr, false));
+ exprs.add(expr);
}
for (SingleAggregateFunction aggFunc : aggFuncs) {
- columnProjectors.add(new ExpressionProjector(aggFunc.toString(), "", aggFunc, false));
+ exprs.add(aggFunc);
}
- return new RowProjector(columnProjectors, 0, false);
+ TupleProjector tupleProjector = implementor.project(exprs);
+ PTable projectedTable = implementor.createProjectedTable();
+ implementor.setTableRef(new TableRef(projectedTable));
+ return new TupleProjectionPlan(aggPlan, tupleProjector, null, implementor.createRowProjector());
}
- private static PDataType getKeyType(Expression expression) {
- PDataType type = expression.getDataType();
- if (!expression.isNullable() || !type.isFixedWidth()) {
- return type;
- }
- if (type.isCastableTo(PDecimal.INSTANCE)) {
- return PDecimal.INSTANCE;
- }
- if (type.isCastableTo(PVarchar.INSTANCE)) {
- return PVarchar.INSTANCE;
+ public boolean isServerAggregate() {
+ RelNode rel = getInput();
+ if (rel instanceof RelSubset) {
+ rel = ((RelSubset) rel).getBest();
}
- // This might happen if someone tries to group by an array
- throw new IllegalStateException("Multiple occurrences of type " + type + " may not occur in a GROUP BY clause");
+ return (rel instanceof PhoenixTableScan) || (rel instanceof PhoenixJoin && ((PhoenixJoin) rel).isHashJoinDoable());
}
private static int getMinNullableIndex(List<SingleAggregateFunction> aggFuncs, boolean isUngroupedAggregation) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b01bdd17/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java
index e5f9cda..a1384a6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java
@@ -123,7 +123,7 @@ public class PhoenixJoin extends Join implements PhoenixRel {
return HashJoinPlan.create(SelectStatement.SELECT_STAR, leftPlan, hashJoinInfo, new HashJoinPlan.HashSubPlan[] {new HashJoinPlan.HashSubPlan(0, rightPlan, rightExprs, false, null, null)});
}
- private boolean isHashJoinDoable() {
+ public boolean isHashJoinDoable() {
// TODO check memory limit
RelNode rel = getLeft();
if (rel instanceof RelSubset) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b01bdd17/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
index b1ca8fa..6b82f42 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
@@ -54,36 +54,13 @@ public class PhoenixProject extends Project implements PhoenixRel {
assert getConvention() == getInput().getConvention();
QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
- TupleProjector tupleProjector = project(implementor, getProjects());
+ List<Expression> exprs = Lists.newArrayList();
+ for (RexNode project : getProjects()) {
+ exprs.add(CalciteUtils.toExpression(project, implementor));
+ }
+ TupleProjector tupleProjector = implementor.project(exprs);
PTable projectedTable = implementor.createProjectedTable();
implementor.setTableRef(new TableRef(projectedTable));
return new TupleProjectionPlan(plan, tupleProjector, null, implementor.createRowProjector());
}
-
- protected static TupleProjector project(Implementor implementor, List<RexNode> projects) {
- KeyValueSchema.KeyValueSchemaBuilder builder = new KeyValueSchema.KeyValueSchemaBuilder(0);
- Expression[] exprs = new Expression[projects.size()];
- List<PColumn> columns = Lists.<PColumn>newArrayList();
- for (int i = 0; i < projects.size(); i++) {
- String name = projects.get(i).toString();
- Expression expr = CalciteUtils.toExpression(projects.get(i), implementor);
- builder.addField(expr);
- exprs[i] = expr;
- columns.add(new PColumnImpl(PNameFactory.newName(name), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY),
- expr.getDataType(), expr.getMaxLength(), expr.getScale(), expr.isNullable(),
- i, expr.getSortOrder(), null, null, false, name));
- }
- try {
- PTable pTable = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME,
- PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM,
- null, null, columns, null, null, Collections.<PTable>emptyList(),
- false, Collections.<PName>emptyList(), null, null, false, false, false, null,
- null, null);
- implementor.setTableRef(new TableRef(CalciteUtils.createTempAlias(), pTable, HConstants.LATEST_TIMESTAMP, false));
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
-
- return new TupleProjector(builder.build(), exprs);
- }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b01bdd17/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java
index 27a7b0e..d89cdab 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java
@@ -1,10 +1,14 @@
package org.apache.phoenix.calcite;
+import java.util.List;
+
import org.apache.calcite.plan.Convention;
import org.apache.calcite.rel.RelNode;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.ColumnExpression;
+import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.TableRef;
@@ -51,5 +55,6 @@ public interface PhoenixRel extends RelNode {
ImplementorContext getCurrentContext();
PTable createProjectedTable();
RowProjector createRowProjector();
+ TupleProjector project(List<Expression> exprs);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b01bdd17/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java
index ef92f34..67e1fd0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java
@@ -1,20 +1,30 @@
package org.apache.phoenix.calcite;
import java.sql.SQLException;
+import java.util.Collections;
import java.util.List;
import java.util.Stack;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.calcite.PhoenixRel.ImplementorContext;
import org.apache.phoenix.compile.ColumnProjector;
import org.apache.phoenix.compile.ExpressionProjector;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.TupleProjectionCompiler;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.ColumnExpression;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.schema.ColumnRef;
+import org.apache.phoenix.schema.KeyValueSchema;
import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnImpl;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.TableRef;
import com.google.common.collect.Lists;
@@ -89,5 +99,31 @@ class PhoenixRelImplementorImpl implements PhoenixRel.Implementor {
// TODO get estimate row size
return new RowProjector(columnProjectors, 0, false);
}
+
+ @Override
+ public TupleProjector project(List<Expression> exprs) {
+ KeyValueSchema.KeyValueSchemaBuilder builder = new KeyValueSchema.KeyValueSchemaBuilder(0);
+ List<PColumn> columns = Lists.<PColumn>newArrayList();
+ for (int i = 0; i < exprs.size(); i++) {
+ Expression expr = exprs.get(i);
+ String name = expr.toString();
+ builder.addField(expr);
+ columns.add(new PColumnImpl(PNameFactory.newName(name), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY),
+ expr.getDataType(), expr.getMaxLength(), expr.getScale(), expr.isNullable(),
+ i, expr.getSortOrder(), null, null, false, name));
+ }
+ try {
+ PTable pTable = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME,
+ PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM,
+ null, null, columns, null, null, Collections.<PTable>emptyList(),
+ false, Collections.<PName>emptyList(), null, null, false, false, false, null,
+ null, null);
+ this.setTableRef(new TableRef(CalciteUtils.createTempAlias(), pTable, HConstants.LATEST_TIMESTAMP, false));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+
+ return new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()]));
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b01bdd17/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
index f681c88..e21d28f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
@@ -121,7 +121,11 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel {
if (projects == null) {
tupleProjector = createTupleProjector(implementor, phoenixTable.getTable());
} else {
- tupleProjector = PhoenixProject.project(implementor, this.projects);
+ List<Expression> exprs = Lists.newArrayList();
+ for (RexNode project : this.projects) {
+ exprs.add(CalciteUtils.toExpression(project, implementor));
+ }
+ tupleProjector = implementor.project(exprs);
}
TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
PTable projectedTable = implementor.createProjectedTable();