You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/04/01 18:30:30 UTC
phoenix git commit: First aggregate query passed
Repository: phoenix
Updated Branches:
refs/heads/calcite 208150098 -> 8097c8b9d
First aggregate query passed
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8097c8b9
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8097c8b9
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8097c8b9
Branch: refs/heads/calcite
Commit: 8097c8b9deb083bece69f170be319b95d3930df5
Parents: 2081500
Author: maryannxue <we...@intel.com>
Authored: Wed Apr 1 12:30:15 2015 -0400
Committer: maryannxue <we...@intel.com>
Committed: Wed Apr 1 12:30:15 2015 -0400
----------------------------------------------------------------------
.../org/apache/phoenix/calcite/CalciteTest.java | 28 ++++
.../apache/phoenix/calcite/CalciteUtils.java | 50 ++++++
.../phoenix/calcite/PhoenixAggregate.java | 160 ++++++++++++++++++-
.../apache/phoenix/calcite/PhoenixSchema.java | 2 +-
.../phoenix/execute/DelegateQueryPlan.java | 4 +
.../apache/phoenix/execute/HashJoinPlan.java | 8 +
.../java/org/apache/phoenix/util/TestUtil.java | 4 +-
7 files changed, 251 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8097c8b9/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
index a9ad76b..70d44f6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
@@ -15,6 +15,7 @@ import java.sql.*;
import java.util.List;
import static org.apache.phoenix.util.TestUtil.JOIN_ITEM_TABLE_FULL_NAME;
+import static org.apache.phoenix.util.TestUtil.JOIN_ORDER_TABLE_FULL_NAME;
import static org.apache.phoenix.util.TestUtil.JOIN_SUPPLIER_TABLE_FULL_NAME;
import static org.junit.Assert.*;
@@ -275,6 +276,33 @@ public class CalciteTest extends BaseClientManagedTimeIT {
{"00C923122312312", "c", "00D300000000XHP"}})
.close();
}
+
+ @Test public void testAggregate() {
+ start().sql("select a_string, count(entity_id) from atable group by a_string")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixAggregate(group=[{0}], EXPR$1=[COUNT()])\n" +
+ " PhoenixTableScan(table=[[phoenix, ATABLE]], project=[[$2]])\n")
+ .resultIs(new Object[][] {
+ {"a", 4L},
+ {"b", 4L},
+ {"c", 1L}})
+ .close();
+ }
+
+ @Test public void testSubquery() {
+ start().sql("SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE quantity = (SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = q.\"item_id\")")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixProject(order_id=[$0])\n" +
+ " PhoenixJoin(condition=[AND(=($2, $6), =($4, $7))], joinType=[inner])\n" +
+ " PhoenixTableScan(table=[[phoenix, ORDERTABLE]])\n" +
+ " PhoenixAggregate(group=[{0}], EXPR$0=[MAX($1)])\n" +
+ " PhoenixProject(item_id0=[$6], QUANTITY=[$4])\n" +
+ " PhoenixJoin(condition=[=($6, $2)], joinType=[inner])\n" +
+ " PhoenixTableScan(table=[[phoenix, ORDERTABLE]])\n" +
+ " PhoenixAggregate(group=[{0}])\n" +
+ " PhoenixTableScan(table=[[phoenix, ORDERTABLE]], project=[[$2]])\n")
+ .close();
+ }
@Test public void testConnectUsingModel() throws Exception {
final Start start = new Start() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8097c8b9/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
index b6eaf37..4962bb5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
@@ -9,6 +9,8 @@ import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlKind;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -17,6 +19,10 @@ import org.apache.phoenix.expression.ComparisonExpression;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.ExpressionType;
import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.expression.function.AggregateFunction;
+import org.apache.phoenix.expression.function.CountAggregateFunction;
+import org.apache.phoenix.expression.function.FunctionExpression;
+import org.apache.phoenix.expression.function.SumAggregateFunction;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -84,6 +90,36 @@ public class CalciteUtils {
});
}
+
+ private static final Map<String, FunctionFactory> FUNCTION_MAP = Maps
+ .newHashMapWithExpectedSize(ExpressionType.values().length);
+ private static final FunctionFactory getFactory(SqlFunction func) {
+ FunctionFactory fFactory = FUNCTION_MAP.get(func.getName());
+ if (fFactory == null) {
+ throw new UnsupportedOperationException("Unsupported SqlFunction: "
+ + func);
+ }
+ return fFactory;
+ }
+ static {
+ FUNCTION_MAP.put("COUNT", new FunctionFactory() {
+ @Override
+ public FunctionExpression newFunction(SqlFunction sqlFunc,
+ List<Expression> args) {
+ if (args.isEmpty()) {
+ args = Lists.asList(LiteralExpression.newConstant(1), new Expression[0]);
+ }
+ return new CountAggregateFunction(args);
+ }
+ });
+ FUNCTION_MAP.put("SUM", new FunctionFactory() {
+ @Override
+ public FunctionExpression newFunction(SqlFunction sqlFunc,
+ List<Expression> args) {
+ return new SumAggregateFunction(args);
+ }
+ });
+ }
static Expression toExpression(RexNode node, Implementor implementor) {
ExpressionFactory eFactory = getFactory(node);
@@ -91,7 +127,21 @@ public class CalciteUtils {
return expression;
}
+ static AggregateFunction toAggregateFunction(SqlAggFunction aggFunc, List<Integer> args, Implementor implementor) {
+ FunctionFactory fFactory = getFactory(aggFunc);
+ List<Expression> exprs = Lists.newArrayListWithExpectedSize(args.size());
+ for (Integer index : args) {
+ exprs.add(implementor.newColumnExpression(index));
+ }
+
+ return (AggregateFunction) (fFactory.newFunction(aggFunc, exprs));
+ }
+
public static interface ExpressionFactory {
public Expression newExpression(RexNode node, Implementor implementor);
}
+
+ public static interface FunctionFactory {
+ public FunctionExpression newFunction(SqlFunction sqlFunc, List<Expression> args);
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8097c8b9/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java
index fb113cc..7a38f25 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java
@@ -1,15 +1,48 @@
package org.apache.phoenix.calcite;
+import java.sql.SQLException;
import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.InvalidRelException;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.ColumnProjector;
+import org.apache.phoenix.compile.ExpressionProjector;
+import org.apache.phoenix.compile.FromCompiler;
import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.SequenceManager;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.ClientAggregatePlan;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.expression.CoerceExpression;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.RowKeyColumnExpression;
+import org.apache.phoenix.expression.aggregator.ClientAggregators;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.expression.function.AggregateFunction;
+import org.apache.phoenix.expression.function.SingleAggregateFunction;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.schema.RowKeyValueAccessor;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PDecimal;
+import org.apache.phoenix.schema.types.PVarchar;
+
+import com.google.common.collect.Lists;
/**
* Implementation of {@link org.apache.calcite.rel.core.Aggregate}
@@ -32,6 +65,11 @@ public class PhoenixAggregate extends Aggregate implements PhoenixRel {
throw new InvalidRelException("unsupported group type: " + getGroupType());
}
}
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return super.computeSelfCost(planner).multiplyBy(PHOENIX_FACTOR);
+ }
@Override
public PhoenixAggregate copy(RelTraitSet traits, RelNode input, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggregateCalls) {
@@ -47,7 +85,125 @@ public class PhoenixAggregate extends Aggregate implements PhoenixRel {
@Override
public QueryPlan implement(Implementor implementor) {
assert getConvention() == getInput().getConvention();
- implementor.visitInput(0, (PhoenixRel) getInput());
- throw new UnsupportedOperationException();
+ if (groupSets.size() > 1) {
+ throw new UnsupportedOperationException();
+ }
+
+ QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+ TableRef tableRef = implementor.getTableRef();
+ ScanPlan basePlan = null;
+ if (plan instanceof ScanPlan) {
+ basePlan = (ScanPlan) plan;
+ } else if (plan instanceof HashJoinPlan) {
+ QueryPlan delegate = ((HashJoinPlan) plan).getDelegate();
+ if (delegate instanceof ScanPlan) {
+ basePlan = (ScanPlan) delegate;
+ }
+ }
+ // TopN, we can not merge with the base plan.
+ if (!plan.getOrderBy().getOrderByExpressions().isEmpty() && plan.getLimit() != null) {
+ basePlan = null;
+ }
+ PhoenixStatement stmt = plan.getContext().getStatement();
+ StatementContext context;
+ try {
+ context = basePlan == null ? new StatementContext(stmt, FromCompiler.getResolver(tableRef), new Scan(), new SequenceManager(stmt)) : basePlan.getContext();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+
+ List<Integer> ordinals = groupSet.asList();
+ // TODO check order-preserving
+ String groupExprAttribName = BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS;
+ // TODO sort group by keys. not sure if there is a way to avoid this sorting,
+ // otherwise we would have add an extra projection.
+ List<Expression> exprs = Lists.newArrayListWithExpectedSize(ordinals.size());
+ List<Expression> keyExprs = exprs;
+ for (int i = 0; i < ordinals.size(); i++) {
+ Expression expr = implementor.newColumnExpression(ordinals.get(i));
+ exprs.add(expr);
+ PDataType keyType = getKeyType(expr);
+ if (keyType == expr.getDataType()) {
+ continue;
+ }
+ if (keyExprs == exprs) {
+ keyExprs = Lists.newArrayList(exprs);
+ }
+ try {
+ keyExprs.set(i, CoerceExpression.create(expr, keyType));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ GroupBy groupBy = new GroupBy.GroupByBuilder().setScanAttribName(groupExprAttribName).setExpressions(exprs).setKeyExpressions(keyExprs).build();
+
+ // TODO sort aggFuncs. same problem with group by key sorting.
+ List<SingleAggregateFunction> aggFuncs = Lists.newArrayList();
+ for (AggregateCall call : aggCalls) {
+ AggregateFunction aggFunc = CalciteUtils.toAggregateFunction(call.getAggregation(), call.getArgList(), implementor);
+ if (!(aggFunc instanceof SingleAggregateFunction)) {
+ throw new UnsupportedOperationException();
+ }
+ aggFuncs.add((SingleAggregateFunction) aggFunc);
+ }
+ int minNullableIndex = getMinNullableIndex(aggFuncs,groupBy.isEmpty());
+ context.getScan().setAttribute(BaseScannerRegionObserver.AGGREGATORS, ServerAggregators.serialize(aggFuncs, minNullableIndex));
+ ClientAggregators clientAggregators = new ClientAggregators(aggFuncs, minNullableIndex);
+ context.getAggregationManager().setAggregators(clientAggregators);
+
+ SelectStatement select = SelectStatement.SELECT_STAR;
+ RowProjector rowProjector = createRowProjector(keyExprs, aggFuncs);
+ if (basePlan == null) {
+ return new ClientAggregatePlan(context, select, tableRef, rowProjector, null, null, OrderBy.EMPTY_ORDER_BY, groupBy, null, plan);
+ }
+
+ QueryPlan aggPlan = new AggregatePlan(context, select, basePlan.getTableRef(), rowProjector, null, OrderBy.EMPTY_ORDER_BY, null, groupBy, null);
+ if (plan instanceof ScanPlan)
+ return aggPlan;
+
+ HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
+ return HashJoinPlan.create(select, aggPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
+ }
+
+ private static RowProjector createRowProjector(List<Expression> keyExprs, List<SingleAggregateFunction> aggFuncs) {
+ List<ColumnProjector> columnProjectors = Lists.<ColumnProjector>newArrayList();
+ for (int i = 0; i < keyExprs.size(); i++) {
+ Expression keyExpr = keyExprs.get(i);
+ RowKeyValueAccessor accessor = new RowKeyValueAccessor(keyExprs, i);
+ Expression expr = new RowKeyColumnExpression(keyExpr, accessor, keyExpr.getDataType());
+ columnProjectors.add(new ExpressionProjector(expr.toString(), "", expr, false));
+ }
+ for (SingleAggregateFunction aggFunc : aggFuncs) {
+ columnProjectors.add(new ExpressionProjector(aggFunc.toString(), "", aggFunc, false));
+ }
+ return new RowProjector(columnProjectors, 0, false);
+ }
+
+ private static PDataType getKeyType(Expression expression) {
+ PDataType type = expression.getDataType();
+ if (!expression.isNullable() || !type.isFixedWidth()) {
+ return type;
+ }
+ if (type.isCastableTo(PDecimal.INSTANCE)) {
+ return PDecimal.INSTANCE;
+ }
+ if (type.isCastableTo(PVarchar.INSTANCE)) {
+ return PVarchar.INSTANCE;
+ }
+ // This might happen if someone tries to group by an array
+ throw new IllegalStateException("Multiple occurrences of type " + type + " may not occur in a GROUP BY clause");
+ }
+
+ private static int getMinNullableIndex(List<SingleAggregateFunction> aggFuncs, boolean isUngroupedAggregation) {
+ int minNullableIndex = aggFuncs.size();
+ for (int i = 0; i < aggFuncs.size(); i++) {
+ SingleAggregateFunction aggFunc = aggFuncs.get(i);
+ if (isUngroupedAggregation ? aggFunc.getAggregator().isNullable() : aggFunc.getAggregatorExpression().isNullable()) {
+ minNullableIndex = i;
+ break;
+ }
+ }
+ return minNullableIndex;
}
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8097c8b9/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
index 9c4f47e..c51308e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
@@ -66,7 +66,7 @@ public class PhoenixSchema implements Schema {
@Override
public Set<String> getTableNames() {
- return ImmutableSet.of("ATABLE", "ITEMTABLE", "SUPPLIERTABLE");
+ return ImmutableSet.of("ATABLE", "ITEMTABLE", "SUPPLIERTABLE", "ORDERTABLE", "CUSTOMERTABLE");
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8097c8b9/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
index 4d50ba0..f487533 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
@@ -36,6 +36,10 @@ public abstract class DelegateQueryPlan implements QueryPlan {
public DelegateQueryPlan(QueryPlan delegate) {
this.delegate = delegate;
}
+
+ public QueryPlan getDelegate() {
+ return delegate;
+ }
@Override
public StatementContext getContext() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8097c8b9/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index aea075d..0f6edc3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -110,6 +110,14 @@ public class HashJoinPlan extends DelegateQueryPlan {
this.subPlans = subPlans;
this.recompileWhereClause = recompileWhereClause;
}
+
+ public HashJoinInfo getJoinInfo() {
+ return this.joinInfo;
+ }
+
+ public SubPlan[] getSubPlans() {
+ return this.subPlans;
+ }
@Override
public ResultIterator iterator() throws SQLException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8097c8b9/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 220d465..2b7a62b 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -194,8 +194,8 @@ public class TestUtil {
public static final String JOIN_ITEM_TABLE = "ItemTable";
public static final String JOIN_SUPPLIER_TABLE = "SupplierTable";
public static final String JOIN_COITEM_TABLE = "CoitemTable";
- public static final String JOIN_ORDER_TABLE_FULL_NAME = '"' + JOIN_SCHEMA + "\".\"" + JOIN_ORDER_TABLE + '"';
- public static final String JOIN_CUSTOMER_TABLE_FULL_NAME = '"' + JOIN_SCHEMA + "\".\"" + JOIN_CUSTOMER_TABLE + '"';
+ public static final String JOIN_ORDER_TABLE_FULL_NAME = JOIN_ORDER_TABLE; // '"' + JOIN_SCHEMA + "\".\"" + JOIN_ORDER_TABLE + '"';
+ public static final String JOIN_CUSTOMER_TABLE_FULL_NAME = JOIN_CUSTOMER_TABLE; // '"' + JOIN_SCHEMA + "\".\"" + JOIN_CUSTOMER_TABLE + '"';
public static final String JOIN_ITEM_TABLE_FULL_NAME = JOIN_ITEM_TABLE; //'"' + JOIN_SCHEMA + "\".\"" + JOIN_ITEM_TABLE + '"';
public static final String JOIN_SUPPLIER_TABLE_FULL_NAME = JOIN_SUPPLIER_TABLE; //'"' + JOIN_SCHEMA + "\".\"" + JOIN_SUPPLIER_TABLE + '"';
public static final String JOIN_COITEM_TABLE_FULL_NAME = '"' + JOIN_SCHEMA + "\".\"" + JOIN_COITEM_TABLE + '"';