You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/04/01 18:30:30 UTC

phoenix git commit: First aggregate query passed

Repository: phoenix
Updated Branches:
  refs/heads/calcite 208150098 -> 8097c8b9d


First aggregate query passed


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8097c8b9
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8097c8b9
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8097c8b9

Branch: refs/heads/calcite
Commit: 8097c8b9deb083bece69f170be319b95d3930df5
Parents: 2081500
Author: maryannxue <we...@intel.com>
Authored: Wed Apr 1 12:30:15 2015 -0400
Committer: maryannxue <we...@intel.com>
Committed: Wed Apr 1 12:30:15 2015 -0400

----------------------------------------------------------------------
 .../org/apache/phoenix/calcite/CalciteTest.java |  28 ++++
 .../apache/phoenix/calcite/CalciteUtils.java    |  50 ++++++
 .../phoenix/calcite/PhoenixAggregate.java       | 160 ++++++++++++++++++-
 .../apache/phoenix/calcite/PhoenixSchema.java   |   2 +-
 .../phoenix/execute/DelegateQueryPlan.java      |   4 +
 .../apache/phoenix/execute/HashJoinPlan.java    |   8 +
 .../java/org/apache/phoenix/util/TestUtil.java  |   4 +-
 7 files changed, 251 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/8097c8b9/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
index a9ad76b..70d44f6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
@@ -15,6 +15,7 @@ import java.sql.*;
 import java.util.List;
 
 import static org.apache.phoenix.util.TestUtil.JOIN_ITEM_TABLE_FULL_NAME;
+import static org.apache.phoenix.util.TestUtil.JOIN_ORDER_TABLE_FULL_NAME;
 import static org.apache.phoenix.util.TestUtil.JOIN_SUPPLIER_TABLE_FULL_NAME;
 import static org.junit.Assert.*;
 
@@ -275,6 +276,33 @@ public class CalciteTest extends BaseClientManagedTimeIT {
                           {"00C923122312312", "c", "00D300000000XHP"}})
                 .close();
     }
+    
+    @Test public void testAggregate() {
+        start().sql("select a_string, count(entity_id) from atable group by a_string")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixAggregate(group=[{0}], EXPR$1=[COUNT()])\n" +
+                           "    PhoenixTableScan(table=[[phoenix, ATABLE]], project=[[$2]])\n")
+                .resultIs(new Object[][] {
+                          {"a", 4L},
+                          {"b", 4L},
+                          {"c", 1L}})
+                .close();
+    }
+    
+    @Test public void testSubquery() {
+        start().sql("SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE quantity = (SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = q.\"item_id\")")
+               .explainIs("PhoenixToEnumerableConverter\n" +
+                          "  PhoenixProject(order_id=[$0])\n" +
+                          "    PhoenixJoin(condition=[AND(=($2, $6), =($4, $7))], joinType=[inner])\n" +
+                          "      PhoenixTableScan(table=[[phoenix, ORDERTABLE]])\n" +
+                          "      PhoenixAggregate(group=[{0}], EXPR$0=[MAX($1)])\n" +
+                          "        PhoenixProject(item_id0=[$6], QUANTITY=[$4])\n" +
+                          "          PhoenixJoin(condition=[=($6, $2)], joinType=[inner])\n" +
+                          "            PhoenixTableScan(table=[[phoenix, ORDERTABLE]])\n" +
+                          "            PhoenixAggregate(group=[{0}])\n" +
+                          "              PhoenixTableScan(table=[[phoenix, ORDERTABLE]], project=[[$2]])\n")
+               .close();
+    }
 
     @Test public void testConnectUsingModel() throws Exception {
         final Start start = new Start() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8097c8b9/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
index b6eaf37..4962bb5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
@@ -9,6 +9,8 @@ import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -17,6 +19,10 @@ import org.apache.phoenix.expression.ComparisonExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ExpressionType;
 import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.expression.function.AggregateFunction;
+import org.apache.phoenix.expression.function.CountAggregateFunction;
+import org.apache.phoenix.expression.function.FunctionExpression;
+import org.apache.phoenix.expression.function.SumAggregateFunction;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -84,6 +90,36 @@ public class CalciteUtils {
 			
 		});
 	}
+	
+    private static final Map<String, FunctionFactory> FUNCTION_MAP = Maps
+            .newHashMapWithExpectedSize(ExpressionType.values().length);
+    private static final FunctionFactory getFactory(SqlFunction func) {
+        FunctionFactory fFactory = FUNCTION_MAP.get(func.getName());
+        if (fFactory == null) {
+            throw new UnsupportedOperationException("Unsupported SqlFunction: "
+                    + func);
+        }
+        return fFactory;
+    }
+    static {
+        FUNCTION_MAP.put("COUNT", new FunctionFactory() {
+            @Override
+            public FunctionExpression newFunction(SqlFunction sqlFunc,
+                    List<Expression> args) {
+                if (args.isEmpty()) {
+                    args = Lists.asList(LiteralExpression.newConstant(1), new Expression[0]);
+                }
+                return new CountAggregateFunction(args);
+            }
+        });
+        FUNCTION_MAP.put("SUM", new FunctionFactory() {
+            @Override
+            public FunctionExpression newFunction(SqlFunction sqlFunc,
+                    List<Expression> args) {
+                return new SumAggregateFunction(args);
+            }
+        });
+    }
 
 	static Expression toExpression(RexNode node, Implementor implementor) {
 		ExpressionFactory eFactory = getFactory(node);
@@ -91,7 +127,21 @@ public class CalciteUtils {
 		return expression;
 	}
 	
+	static AggregateFunction toAggregateFunction(SqlAggFunction aggFunc, List<Integer> args, Implementor implementor) {
+	    FunctionFactory fFactory = getFactory(aggFunc);
+	    List<Expression> exprs = Lists.newArrayListWithExpectedSize(args.size());
+	    for (Integer index : args) {
+	        exprs.add(implementor.newColumnExpression(index));
+	    }
+	    
+	    return (AggregateFunction) (fFactory.newFunction(aggFunc, exprs));
+	}
+	
 	public static interface ExpressionFactory {
 		public Expression newExpression(RexNode node, Implementor implementor);
 	}
+	
+	public static interface FunctionFactory {
+	    public FunctionExpression newFunction(SqlFunction sqlFunc, List<Expression> args);
+	}
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8097c8b9/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java
index fb113cc..7a38f25 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java
@@ -1,15 +1,48 @@
 package org.apache.phoenix.calcite;
 
+import java.sql.SQLException;
 import java.util.List;
 
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.ColumnProjector;
+import org.apache.phoenix.compile.ExpressionProjector;
+import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.SequenceManager;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.ClientAggregatePlan;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.expression.CoerceExpression;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.RowKeyColumnExpression;
+import org.apache.phoenix.expression.aggregator.ClientAggregators;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.expression.function.AggregateFunction;
+import org.apache.phoenix.expression.function.SingleAggregateFunction;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.schema.RowKeyValueAccessor;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PDecimal;
+import org.apache.phoenix.schema.types.PVarchar;
+
+import com.google.common.collect.Lists;
 
 /**
  * Implementation of {@link org.apache.calcite.rel.core.Aggregate}
@@ -32,6 +65,11 @@ public class PhoenixAggregate extends Aggregate implements PhoenixRel {
                 throw new InvalidRelException("unsupported group type: " + getGroupType());
         }
     }
+    
+    @Override
+    public RelOptCost computeSelfCost(RelOptPlanner planner) {
+        return super.computeSelfCost(planner).multiplyBy(PHOENIX_FACTOR);
+    }
 
     @Override
     public PhoenixAggregate copy(RelTraitSet traits, RelNode input, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggregateCalls) {
@@ -47,7 +85,125 @@ public class PhoenixAggregate extends Aggregate implements PhoenixRel {
     @Override
     public QueryPlan implement(Implementor implementor) {
         assert getConvention() == getInput().getConvention();
-        implementor.visitInput(0, (PhoenixRel) getInput());
-        throw new UnsupportedOperationException();
+        if (groupSets.size() > 1) {
+            throw new UnsupportedOperationException();
+        }
+        
+        QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+        TableRef tableRef = implementor.getTableRef();
+        ScanPlan basePlan = null;
+        if (plan instanceof ScanPlan) {
+            basePlan = (ScanPlan) plan;
+        } else if (plan instanceof HashJoinPlan) {
+            QueryPlan delegate = ((HashJoinPlan) plan).getDelegate();
+            if (delegate instanceof ScanPlan) {
+                basePlan = (ScanPlan) delegate;
+            }
+        }
+        // TopN, we can not merge with the base plan.
+        if (!plan.getOrderBy().getOrderByExpressions().isEmpty() && plan.getLimit() != null) {
+            basePlan = null;
+        }
+        PhoenixStatement stmt = plan.getContext().getStatement();
+        StatementContext context;
+        try {
+            context = basePlan == null ? new StatementContext(stmt, FromCompiler.getResolver(tableRef), new Scan(), new SequenceManager(stmt)) : basePlan.getContext();
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }        
+        
+        List<Integer> ordinals = groupSet.asList();
+        // TODO check order-preserving
+        String groupExprAttribName = BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS;
+        // TODO sort group by keys. not sure if there is a way to avoid this sorting,
+        //      otherwise we would have add an extra projection.
+        List<Expression> exprs = Lists.newArrayListWithExpectedSize(ordinals.size());
+        List<Expression> keyExprs = exprs;
+        for (int i = 0; i < ordinals.size(); i++) {
+            Expression expr = implementor.newColumnExpression(ordinals.get(i));
+            exprs.add(expr);
+            PDataType keyType = getKeyType(expr);
+            if (keyType == expr.getDataType()) {
+                continue;
+            }
+            if (keyExprs == exprs) {
+                keyExprs = Lists.newArrayList(exprs);
+            }
+            try {
+                keyExprs.set(i, CoerceExpression.create(expr, keyType));
+            } catch (SQLException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        GroupBy groupBy = new GroupBy.GroupByBuilder().setScanAttribName(groupExprAttribName).setExpressions(exprs).setKeyExpressions(keyExprs).build();
+        
+        // TODO sort aggFuncs. same problem with group by key sorting.
+        List<SingleAggregateFunction> aggFuncs = Lists.newArrayList();
+        for (AggregateCall call : aggCalls) {
+            AggregateFunction aggFunc = CalciteUtils.toAggregateFunction(call.getAggregation(), call.getArgList(), implementor);
+            if (!(aggFunc instanceof SingleAggregateFunction)) {
+                throw new UnsupportedOperationException();
+            }
+            aggFuncs.add((SingleAggregateFunction) aggFunc);
+        }
+        int minNullableIndex = getMinNullableIndex(aggFuncs,groupBy.isEmpty());
+        context.getScan().setAttribute(BaseScannerRegionObserver.AGGREGATORS, ServerAggregators.serialize(aggFuncs, minNullableIndex));
+        ClientAggregators clientAggregators = new ClientAggregators(aggFuncs, minNullableIndex);
+        context.getAggregationManager().setAggregators(clientAggregators);
+        
+        SelectStatement select = SelectStatement.SELECT_STAR;
+        RowProjector rowProjector = createRowProjector(keyExprs, aggFuncs);
+        if (basePlan == null) {
+            return new ClientAggregatePlan(context, select, tableRef, rowProjector, null, null, OrderBy.EMPTY_ORDER_BY, groupBy, null, plan);
+        }
+        
+        QueryPlan aggPlan = new AggregatePlan(context, select, basePlan.getTableRef(), rowProjector, null, OrderBy.EMPTY_ORDER_BY, null, groupBy, null);
+        if (plan instanceof ScanPlan)
+            return aggPlan;
+        
+        HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
+        return HashJoinPlan.create(select, aggPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
+    }
+    
+    private static RowProjector createRowProjector(List<Expression> keyExprs, List<SingleAggregateFunction> aggFuncs) {
+        List<ColumnProjector> columnProjectors = Lists.<ColumnProjector>newArrayList();
+        for (int i = 0; i < keyExprs.size(); i++) {
+            Expression keyExpr = keyExprs.get(i);
+            RowKeyValueAccessor accessor = new RowKeyValueAccessor(keyExprs, i);
+            Expression expr = new RowKeyColumnExpression(keyExpr, accessor, keyExpr.getDataType());
+            columnProjectors.add(new ExpressionProjector(expr.toString(), "", expr, false));
+        }
+        for (SingleAggregateFunction aggFunc : aggFuncs) {
+            columnProjectors.add(new ExpressionProjector(aggFunc.toString(), "", aggFunc, false));
+        }
+        return new RowProjector(columnProjectors, 0, false);                
+    }
+    
+    private static PDataType getKeyType(Expression expression) {
+        PDataType type = expression.getDataType();
+        if (!expression.isNullable() || !type.isFixedWidth()) {
+            return type;
+        }
+        if (type.isCastableTo(PDecimal.INSTANCE)) {
+            return PDecimal.INSTANCE;
+        }
+        if (type.isCastableTo(PVarchar.INSTANCE)) {
+            return PVarchar.INSTANCE;
+        }
+        // This might happen if someone tries to group by an array
+        throw new IllegalStateException("Multiple occurrences of type " + type + " may not occur in a GROUP BY clause");
+    }
+    
+    private static int getMinNullableIndex(List<SingleAggregateFunction> aggFuncs, boolean isUngroupedAggregation) {
+        int minNullableIndex = aggFuncs.size();
+        for (int i = 0; i < aggFuncs.size(); i++) {
+            SingleAggregateFunction aggFunc = aggFuncs.get(i);
+            if (isUngroupedAggregation ? aggFunc.getAggregator().isNullable() : aggFunc.getAggregatorExpression().isNullable()) {
+                minNullableIndex = i;
+                break;
+            }
+        }
+        return minNullableIndex;
     }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8097c8b9/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
index 9c4f47e..c51308e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
@@ -66,7 +66,7 @@ public class PhoenixSchema implements Schema {
 
     @Override
     public Set<String> getTableNames() {
-        return ImmutableSet.of("ATABLE", "ITEMTABLE", "SUPPLIERTABLE");
+        return ImmutableSet.of("ATABLE", "ITEMTABLE", "SUPPLIERTABLE", "ORDERTABLE", "CUSTOMERTABLE");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8097c8b9/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
index 4d50ba0..f487533 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
@@ -36,6 +36,10 @@ public abstract class DelegateQueryPlan implements QueryPlan {
     public DelegateQueryPlan(QueryPlan delegate) {
         this.delegate = delegate;
     }
+    
+    public QueryPlan getDelegate() {
+        return delegate;
+    }
 
     @Override
     public StatementContext getContext() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8097c8b9/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index aea075d..0f6edc3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -110,6 +110,14 @@ public class HashJoinPlan extends DelegateQueryPlan {
         this.subPlans = subPlans;
         this.recompileWhereClause = recompileWhereClause;
     }
+    
+    public HashJoinInfo getJoinInfo() {
+        return this.joinInfo;
+    }
+    
+    public SubPlan[] getSubPlans() {
+        return this.subPlans;
+    }
 
     @Override
     public ResultIterator iterator() throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8097c8b9/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 220d465..2b7a62b 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -194,8 +194,8 @@ public class TestUtil {
     public static final String JOIN_ITEM_TABLE = "ItemTable";
     public static final String JOIN_SUPPLIER_TABLE = "SupplierTable";
     public static final String JOIN_COITEM_TABLE = "CoitemTable";
-    public static final String JOIN_ORDER_TABLE_FULL_NAME = '"' + JOIN_SCHEMA + "\".\"" + JOIN_ORDER_TABLE + '"';
-    public static final String JOIN_CUSTOMER_TABLE_FULL_NAME = '"' + JOIN_SCHEMA + "\".\"" + JOIN_CUSTOMER_TABLE + '"';
+    public static final String JOIN_ORDER_TABLE_FULL_NAME = JOIN_ORDER_TABLE; // '"' + JOIN_SCHEMA + "\".\"" + JOIN_ORDER_TABLE + '"';
+    public static final String JOIN_CUSTOMER_TABLE_FULL_NAME = JOIN_CUSTOMER_TABLE; // '"' + JOIN_SCHEMA + "\".\"" + JOIN_CUSTOMER_TABLE + '"';
     public static final String JOIN_ITEM_TABLE_FULL_NAME = JOIN_ITEM_TABLE; //'"' + JOIN_SCHEMA + "\".\"" + JOIN_ITEM_TABLE + '"';
     public static final String JOIN_SUPPLIER_TABLE_FULL_NAME = JOIN_SUPPLIER_TABLE; //'"' + JOIN_SCHEMA + "\".\"" + JOIN_SUPPLIER_TABLE + '"';
     public static final String JOIN_COITEM_TABLE_FULL_NAME = '"' + JOIN_SCHEMA + "\".\"" + JOIN_COITEM_TABLE + '"';