You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/04/14 18:01:14 UTC

phoenix git commit: Add PhoenixCompactClientSort for merging OrderBy into AggregatePlan

Repository: phoenix
Updated Branches:
  refs/heads/calcite 6a8c37629 -> fa60f8ca7


Add PhoenixCompactClientSort for merging OrderBy into AggregatePlan


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/fa60f8ca
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/fa60f8ca
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/fa60f8ca

Branch: refs/heads/calcite
Commit: fa60f8ca7c9dea4333fcc55d196385b885bafec5
Parents: 6a8c376
Author: maryannxue <we...@intel.com>
Authored: Tue Apr 14 12:00:54 2015 -0400
Committer: maryannxue <we...@intel.com>
Committed: Tue Apr 14 12:00:54 2015 -0400

----------------------------------------------------------------------
 .../org/apache/phoenix/calcite/CalciteTest.java |  6 ++--
 .../phoenix/calcite/PhoenixClientSort.java      |  2 +-
 .../calcite/PhoenixCompactClientSort.java       | 33 ++++++++++++++------
 .../phoenix/calcite/PhoenixServerAggregate.java | 13 ++++----
 .../phoenix/calcite/PhoenixServerSort.java      | 15 ++++-----
 .../org/apache/phoenix/calcite/PhoenixSort.java |  8 +++--
 .../phoenix/calcite/PhoenixTableScan.java       |  2 +-
 .../apache/phoenix/execute/HashJoinPlan.java    |  8 +++--
 .../phoenix/execute/TupleProjectionPlan.java    |  8 +++++
 9 files changed, 63 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa60f8ca/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
index df870e1..9d2dbea 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
@@ -385,8 +385,8 @@ public class CalciteTest extends BaseClientManagedTimeIT {
         
         start().sql("select count(entity_id), a_string from atable group by a_string order by count(entity_id), a_string desc")
                 .explainIs("PhoenixToEnumerableConverter\n" +
-                           "  PhoenixClientSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[DESC])\n" +
-                           "    PhoenixClientProject(EXPR$0=[$1], A_STRING=[$0])\n" +
+                           "  PhoenixClientProject(EXPR$0=[$1], A_STRING=[$0])\n" +
+                           "    PhoenixCompactClientSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[DESC])\n" +
                            "      PhoenixServerAggregate(group=[{0}], EXPR$0=[COUNT()])\n" +
                            "        PhoenixServerProject(A_STRING=[$2])\n" +
                            "          PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
@@ -398,7 +398,7 @@ public class CalciteTest extends BaseClientManagedTimeIT {
         
         start().sql("select s.name, count(\"item_id\") from " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s join " + JOIN_ITEM_TABLE_FULL_NAME + " i on s.\"supplier_id\" = i.\"supplier_id\" group by s.name order by count(\"item_id\"), s.name desc")
                 .explainIs("PhoenixToEnumerableConverter\n" +
-                           "  PhoenixClientSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[DESC])\n" +
+                           "  PhoenixCompactClientSort(sort0=[$1], sort1=[$0], dir0=[ASC], dir1=[DESC])\n" +
                            "    PhoenixServerAggregate(group=[{0}], EXPR$1=[COUNT()])\n" +
                            "      PhoenixServerProject(NAME=[$2])\n" +
                            "        PhoenixServerJoin(condition=[=($1, $0)], joinType=[inner])\n" +

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa60f8ca/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixClientSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixClientSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixClientSort.java
index 6bb67fb..417bda8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixClientSort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixClientSort.java
@@ -56,7 +56,7 @@ public class PhoenixClientSort extends PhoenixSort {
             throw new RuntimeException(e);
         }
         
-        OrderBy orderBy = super.getOrderBy(implementor);
+        OrderBy orderBy = super.getOrderBy(implementor, null);
         Integer limit = super.getLimit(implementor);
         
         return new ClientScanPlan(context, plan.getStatement(), tableRef, RowProjector.EMPTY_PROJECTOR, limit, null, orderBy, plan);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa60f8ca/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixCompactClientSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixCompactClientSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixCompactClientSort.java
index 7037598..40a0697 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixCompactClientSort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixCompactClientSort.java
@@ -11,7 +11,8 @@ import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.execute.HashJoinPlan;
 import org.apache.phoenix.execute.AggregatePlan;
-import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.execute.TupleProjectionPlan;
+import org.apache.phoenix.execute.TupleProjector;
 
 public class PhoenixCompactClientSort extends PhoenixSort {
 
@@ -40,26 +41,38 @@ public class PhoenixCompactClientSort extends PhoenixSort {
             throw new UnsupportedOperationException();
             
         QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
-        assert (plan instanceof AggregatePlan || plan instanceof HashJoinPlan) 
-                && plan.getLimit() == null;
+        assert plan instanceof TupleProjectionPlan;
+        
+        // PhoenixServerAggregate wraps the AggregatePlan with a TupleProjectionPlan,
+        // so we need to unwrap the TupleProjectionPlan.
+        TupleProjectionPlan tupleProjectionPlan = (TupleProjectionPlan) plan;
+        assert tupleProjectionPlan.getPostFilter() == null;
+        QueryPlan innerPlan = tupleProjectionPlan.getDelegate();
+        TupleProjector tupleProjector = tupleProjectionPlan.getTupleProjector();
+        assert (innerPlan instanceof AggregatePlan 
+                    || innerPlan instanceof HashJoinPlan)
+                && innerPlan.getLimit() == null; 
         
         AggregatePlan basePlan;
-        if (plan instanceof AggregatePlan) {
-            basePlan = (AggregatePlan) plan;
+        HashJoinPlan hashJoinPlan = null;
+        if (innerPlan instanceof AggregatePlan) {
+            basePlan = (AggregatePlan) innerPlan;
         } else {
-            QueryPlan delegate = ((HashJoinPlan) plan).getDelegate();
+            hashJoinPlan = (HashJoinPlan) innerPlan;
+            QueryPlan delegate = hashJoinPlan.getDelegate();
             assert delegate instanceof AggregatePlan;
             basePlan = (AggregatePlan) delegate;
         }
         
-        OrderBy orderBy = super.getOrderBy(implementor);
+        OrderBy orderBy = super.getOrderBy(implementor, tupleProjector);
         Integer limit = super.getLimit(implementor);
         QueryPlan newPlan = AggregatePlan.create((AggregatePlan) basePlan, orderBy, limit);
         
-        if (plan instanceof HashJoinPlan) {        
-            HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
-            newPlan = HashJoinPlan.create((SelectStatement) (plan.getStatement()), newPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
+        if (hashJoinPlan != null) {        
+            newPlan = HashJoinPlan.create(hashJoinPlan.getStatement(), newPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
         }
+        // Recover the wrapping of TupleProjectionPlan
+        newPlan = new TupleProjectionPlan(newPlan, tupleProjector, null);
         return newPlan;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa60f8ca/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerAggregate.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerAggregate.java
index a535a35..bdff153 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerAggregate.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerAggregate.java
@@ -18,7 +18,6 @@ import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.execute.AggregatePlan;
 import org.apache.phoenix.execute.HashJoinPlan;
 import org.apache.phoenix.execute.ScanPlan;
-import org.apache.phoenix.parse.SelectStatement;
 
 public class PhoenixServerAggregate extends PhoenixAggregate {
 
@@ -45,14 +44,17 @@ public class PhoenixServerAggregate extends PhoenixAggregate {
         assert getConvention() == getInput().getConvention();
         
         QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
-        assert (plan instanceof ScanPlan || plan instanceof HashJoinPlan) 
+        assert (plan instanceof ScanPlan 
+                    || plan instanceof HashJoinPlan)
                 && plan.getLimit() == null;
         
         ScanPlan basePlan;
+        HashJoinPlan hashJoinPlan = null;
         if (plan instanceof ScanPlan) {
             basePlan = (ScanPlan) plan;
         } else {
-            QueryPlan delegate = ((HashJoinPlan) plan).getDelegate();
+            hashJoinPlan = (HashJoinPlan) plan;
+            QueryPlan delegate = hashJoinPlan.getDelegate();
             assert delegate instanceof ScanPlan;
             basePlan = (ScanPlan) delegate;
         }
@@ -62,9 +64,8 @@ public class PhoenixServerAggregate extends PhoenixAggregate {
         super.serializeAggregators(implementor, context, groupBy.isEmpty());
         
         QueryPlan aggPlan = new AggregatePlan(context, basePlan.getStatement(), basePlan.getTableRef(), RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null, groupBy, null);
-        if (plan instanceof HashJoinPlan) {        
-            HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
-            aggPlan = HashJoinPlan.create((SelectStatement) (plan.getStatement()), aggPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
+        if (hashJoinPlan != null) {        
+            aggPlan = HashJoinPlan.create(hashJoinPlan.getStatement(), aggPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
         }
         
         return PhoenixAggregate.wrapWithProject(implementor, aggPlan, groupBy.getKeyExpressions(), Arrays.asList(context.getAggregationManager().getAggregators().getFunctions()));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa60f8ca/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerSort.java
index 1538309..17516dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerSort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixServerSort.java
@@ -13,7 +13,6 @@ import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.execute.HashJoinPlan;
 import org.apache.phoenix.execute.ScanPlan;
-import org.apache.phoenix.parse.SelectStatement;
 
 public class PhoenixServerSort extends PhoenixSort {
 
@@ -42,19 +41,22 @@ public class PhoenixServerSort extends PhoenixSort {
             throw new UnsupportedOperationException();
             
         QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
-        assert (plan instanceof ScanPlan || plan instanceof HashJoinPlan) 
+        assert (plan instanceof ScanPlan 
+                    || plan instanceof HashJoinPlan) 
                 && plan.getLimit() == null;
         
         ScanPlan basePlan;
+        HashJoinPlan hashJoinPlan = null;
         if (plan instanceof ScanPlan) {
             basePlan = (ScanPlan) plan;
         } else {
-            QueryPlan delegate = ((HashJoinPlan) plan).getDelegate();
+            hashJoinPlan = (HashJoinPlan) plan;
+            QueryPlan delegate = hashJoinPlan.getDelegate();
             assert delegate instanceof ScanPlan;
             basePlan = (ScanPlan) delegate;
         }
         
-        OrderBy orderBy = super.getOrderBy(implementor);
+        OrderBy orderBy = super.getOrderBy(implementor, null);
         Integer limit = super.getLimit(implementor);
         QueryPlan newPlan;
         try {
@@ -63,9 +65,8 @@ public class PhoenixServerSort extends PhoenixSort {
             throw new RuntimeException(e);
         }
         
-        if (plan instanceof HashJoinPlan) {        
-            HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
-            newPlan = HashJoinPlan.create((SelectStatement) (plan.getStatement()), newPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
+        if (hashJoinPlan != null) {        
+            newPlan = HashJoinPlan.create(hashJoinPlan.getStatement(), newPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
         }
         return newPlan;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa60f8ca/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java
index 1b28973..2338b5e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSort.java
@@ -12,9 +12,11 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rex.RexNode;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.OrderByExpression;
 import org.apache.phoenix.schema.SortOrder;
+
 import com.google.common.collect.Lists;
 
 /**
@@ -31,12 +33,14 @@ abstract public class PhoenixSort extends Sort implements PhoenixRel {
         assert getConvention() == PhoenixRel.CONVENTION;
     }
     
-    protected OrderBy getOrderBy(Implementor implementor) {
+    protected OrderBy getOrderBy(Implementor implementor, TupleProjector tupleProjector) {
         assert !getCollation().getFieldCollations().isEmpty();
         
         List<OrderByExpression> orderByExpressions = Lists.newArrayList();
         for (RelFieldCollation fieldCollation : getCollation().getFieldCollations()) {
-            Expression expr = implementor.newColumnExpression(fieldCollation.getFieldIndex());
+            Expression expr = tupleProjector == null ? 
+                      implementor.newColumnExpression(fieldCollation.getFieldIndex()) 
+                    : tupleProjector.getExpressions()[fieldCollation.getFieldIndex()];
             boolean isAscending = fieldCollation.getDirection() == Direction.ASCENDING;
             if (expr.getSortOrder() == SortOrder.DESC) {
                 isAscending = !isAscending;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa60f8ca/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
index c5db7f1..6044739 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java
@@ -81,7 +81,7 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel {
         planner.addRule(PhoenixServerSortRule.SORT_SCAN);
         planner.addRule(PhoenixServerSortRule.SORT_SERVERJOIN);
         planner.addRule(PhoenixServerSortRule.SORT_SERVERPROJECT);
-        // TODO planner.addRule(PhoenixCompactClientSortRule.SORT_SERVERAGGREGATE);
+        planner.addRule(PhoenixCompactClientSortRule.SORT_SERVERAGGREGATE);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa60f8ca/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 0f6edc3..cc2a244 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -54,7 +54,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.job.JobManager.JobCallable;
 import org.apache.phoenix.join.HashCacheClient;
 import org.apache.phoenix.join.HashJoinInfo;
-import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.parse.SelectStatement;
@@ -118,6 +117,11 @@ public class HashJoinPlan extends DelegateQueryPlan {
     public SubPlan[] getSubPlans() {
         return this.subPlans;
     }
+    
+    @Override
+    public Integer getLimit() {
+        return this.joinInfo == null ? null : this.joinInfo.getLimit();
+    }
 
     @Override
     public ResultIterator iterator() throws SQLException {
@@ -232,7 +236,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
     }
 
     @Override
-    public FilterableStatement getStatement() {
+    public SelectStatement getStatement() {
         return statement;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/fa60f8ca/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
index c9cbd15..c2984b1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
@@ -40,6 +40,14 @@ public class TupleProjectionPlan extends DelegateQueryPlan {
         this.tupleProjector = tupleProjector;
         this.postFilter = postFilter;
     }
+    
+    public TupleProjector getTupleProjector() {
+        return this.tupleProjector;
+    }
+    
+    public Expression getPostFilter() {
+        return this.postFilter;
+    }
 
     @Override
     public ExplainPlan getExplainPlan() throws SQLException {