You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/04/16 04:26:48 UTC

phoenix git commit: PHOENIX-1859 Implement PhoenixLimit in Phoenix/Calcite Integration

Repository: phoenix
Updated Branches:
  refs/heads/calcite a0aca7f51 -> 2368ea6d3


PHOENIX-1859 Implement PhoenixLimit in Phoenix/Calcite Integration


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2368ea6d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2368ea6d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2368ea6d

Branch: refs/heads/calcite
Commit: 2368ea6d3a8375a35c492d62c6383e9a9eca9ee6
Parents: a0aca7f
Author: maryannxue <we...@intel.com>
Authored: Wed Apr 15 22:26:34 2015 -0400
Committer: maryannxue <we...@intel.com>
Committed: Wed Apr 15 22:26:34 2015 -0400

----------------------------------------------------------------------
 .../org/apache/phoenix/calcite/CalciteTest.java | 58 +++++++++++++++
 .../apache/phoenix/calcite/CalciteUtils.java    | 15 ++++
 .../apache/phoenix/calcite/PhoenixTable.java    |  2 +-
 .../calcite/metadata/PhoenixRelMdRowCount.java  | 21 ++++++
 .../metadata/PhoenixRelMetadataProvider.java    | 15 ++++
 .../calcite/rel/PhoenixAbstractSort.java        | 37 +++++++---
 .../phoenix/calcite/rel/PhoenixClientJoin.java  |  4 +-
 .../phoenix/calcite/rel/PhoenixLimit.java       | 66 +++++++++++++++++
 .../apache/phoenix/calcite/rel/PhoenixRel.java  |  5 ++
 .../phoenix/calcite/rel/PhoenixServerJoin.java  |  3 +-
 .../phoenix/calcite/rel/PhoenixTableScan.java   | 35 ++++++---
 .../calcite/rules/PhoenixAddScanLimitRule.java  | 74 ++++++++++++++++++++
 .../calcite/rules/PhoenixConverterRules.java    | 46 +++++++++---
 .../rules/PhoenixFilterScanMergeRule.java       |  8 +--
 .../apache/phoenix/execute/HashJoinPlan.java    |  5 --
 15 files changed, 353 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
index 6d980e1..05dcc9e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
@@ -517,6 +517,64 @@ public class CalciteTest extends BaseClientManagedTimeIT {
                 .close();
     }
     
+    @Test public void testLimit() {
+        start().sql("select organization_id, entity_id, a_string from aTable limit 5")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixLimit(fetch=[5])\n" +
+                           "    PhoenixServerProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[$2])\n" +
+                           "      PhoenixTableScan(table=[[phoenix, ATABLE]], statelessFetch=[5])\n")
+                .resultIs(new Object[][] {
+                          {"00D300000000XHP", "00A123122312312", "a"}, 
+                          {"00D300000000XHP", "00A223122312312", "a"}, 
+                          {"00D300000000XHP", "00A323122312312", "a"}, 
+                          {"00D300000000XHP", "00A423122312312", "a"}, 
+                          {"00D300000000XHP", "00B523122312312", "b"}})
+                .close();
+        
+        start().sql("select count(entity_id), a_string from atable group by a_string limit 2")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixClientProject(EXPR$0=[$1], A_STRING=[$0])\n" +
+                           "    PhoenixLimit(fetch=[2])\n" +
+                           "      PhoenixServerAggregate(group=[{0}], EXPR$0=[COUNT()])\n" +
+                           "        PhoenixServerProject(A_STRING=[$2])\n" +
+                           "          PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
+                .resultIs(new Object[][] {
+                          {4L, "a"},
+                          {4L, "b"}})
+                .close();
+        
+        start().sql("select s.name, count(\"item_id\") from " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s join " + JOIN_ITEM_TABLE_FULL_NAME + " i on s.\"supplier_id\" = i.\"supplier_id\" group by s.name limit 3")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixLimit(fetch=[3])\n" +
+                           "    PhoenixServerAggregate(group=[{0}], EXPR$1=[COUNT()])\n" +
+                           "      PhoenixServerProject(NAME=[$2])\n" +
+                           "        PhoenixServerJoin(condition=[=($1, $0)], joinType=[inner])\n" +
+                           "          PhoenixServerProject(supplier_id=[$5])\n" +
+                           "            PhoenixTableScan(table=[[phoenix, ITEMTABLE]])\n" +
+                           "          PhoenixServerProject(supplier_id=[$0], NAME=[$1])\n" +
+                           "            PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]])\n")
+                .resultIs(new Object[][] {
+                          {"S1", 2L},
+                          {"S2", 2L},
+                          {"S5", 1L}})
+                .close();
+        
+        start().sql("SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" limit 3")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixClientProject(item_id=[$0], NAME=[$1], supplier_id=[$3], NAME0=[$4])\n" +
+                           "    PhoenixLimit(fetch=[3])\n" +
+                           "      PhoenixServerJoin(condition=[=($2, $3)], joinType=[inner])\n" +
+                           "        PhoenixServerProject(item_id=[$0], NAME=[$1], supplier_id=[$5])\n" +
+                           "          PhoenixTableScan(table=[[phoenix, ITEMTABLE]])\n" +
+                           "        PhoenixServerProject(supplier_id=[$0], NAME=[$1])\n" +
+                           "          PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]])\n")
+                .resultIs(new Object[][] {
+                          {"0000000001", "T1", "0000000001", "S1"}, 
+                          {"0000000002", "T2", "0000000001", "S1"}, 
+                          {"0000000003", "T3", "0000000002", "S2"}})
+                .close();
+    }
+    
     @Test public void testSubquery() {
         start().sql("SELECT \"order_id\", quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE quantity = (SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = q.\"item_id\")")
                .explainIs("PhoenixToEnumerableConverter\n" +

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
index 5c4d7ab..a3fbce8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
@@ -155,6 +155,21 @@ public class CalciteUtils {
 	    return (AggregateFunction) (fFactory.newFunction(aggFunc, exprs));
 	}
 	
+	public static Object evaluateStatelessExpression(RexNode node) {
+	    try {
+	        Expression expression = toExpression(node, null);
+	        if (expression.isStateless()) {
+	            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+	            expression.evaluate(null, ptr);
+	            return expression.getDataType().toObject(ptr);
+	        }
+	    } catch (Exception e) {
+	        // Expression is not stateless. do nothing.
+	    }
+	    
+	    return null;
+	}
+	
 	public static interface ExpressionFactory {
 		public Expression newExpression(RexNode node, Implementor implementor);
 	}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
index 1e74d6c..e47521b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
@@ -72,7 +72,7 @@ public class PhoenixTable extends AbstractTable implements TranslatableTable {
     @Override
     public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
         final RelOptCluster cluster = context.getCluster();
-        return new PhoenixTableScan(cluster, cluster.traitSetOf(PhoenixRel.CONVENTION), relOptTable, null);
+        return new PhoenixTableScan(cluster, cluster.traitSetOf(PhoenixRel.CONVENTION), relOptTable, null, null);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdRowCount.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdRowCount.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdRowCount.java
new file mode 100644
index 0000000..797867d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdRowCount.java
@@ -0,0 +1,21 @@
+package org.apache.phoenix.calcite.metadata;
+
+import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.phoenix.calcite.rel.PhoenixAbstractSort;
+import org.apache.phoenix.calcite.rel.PhoenixLimit;
+
+public class PhoenixRelMdRowCount {
+    public static final RelMetadataProvider SOURCE =
+            ReflectiveRelMetadataProvider.reflectiveSource(
+                BuiltInMethod.ROW_COUNT.method, new PhoenixRelMdRowCount());
+    
+    public Double getRowCount(PhoenixAbstractSort rel) {
+        return rel.getRows();
+      }
+    
+    public Double getRowCount(PhoenixLimit rel) {
+        return rel.getRows();
+      }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMetadataProvider.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMetadataProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMetadataProvider.java
new file mode 100644
index 0000000..ea37251
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMetadataProvider.java
@@ -0,0 +1,15 @@
+package org.apache.phoenix.calcite.metadata;
+
+import org.apache.calcite.rel.metadata.ChainedRelMetadataProvider;
+import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider;
+import com.google.common.collect.ImmutableList;
+
+public class PhoenixRelMetadataProvider extends ChainedRelMetadataProvider {
+
+    public PhoenixRelMetadataProvider() {
+        super(ImmutableList.of(
+                PhoenixRelMdRowCount.SOURCE, 
+                new DefaultRelMetadataProvider()));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
index 708b5ae..4598bf5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
@@ -3,6 +3,8 @@ package org.apache.phoenix.calcite.rel;
 import java.util.List;
 
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelFieldCollation;
@@ -10,8 +12,9 @@ import org.apache.calcite.rel.RelFieldCollation.Direction;
 import org.apache.calcite.rel.RelFieldCollation.NullDirection;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.calcite.util.Util;
 import org.apache.phoenix.calcite.CalciteUtils;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.execute.TupleProjector;
@@ -30,14 +33,35 @@ import com.google.common.collect.Lists;
 abstract public class PhoenixAbstractSort extends Sort implements PhoenixRel {
     protected static final double CLIENT_MERGE_FACTOR = 0.5;
     
+    private final Integer statelessFetch;
+    
     public PhoenixAbstractSort(RelOptCluster cluster, RelTraitSet traits, RelNode child, RelCollation collation, RexNode offset, RexNode fetch) {
         super(cluster, traits, child, collation, offset, fetch);
+        Object value = fetch == null ? null : CalciteUtils.evaluateStatelessExpression(fetch);
+        this.statelessFetch = value == null ? null : ((Number) value).intValue();        
         assert getConvention() == PhoenixRel.CONVENTION;
+        assert !getCollation().getFieldCollations().isEmpty();
+    }
+
+    @Override 
+    public RelOptCost computeSelfCost(RelOptPlanner planner) {
+        // Fix rowCount for super class's computeSelfCost() with input's row count.
+        double rowCount = RelMetadataQuery.getRowCount(getInput());
+        double bytesPerRow = getRowType().getFieldCount() * 4;
+        return planner.getCostFactory().makeCost(
+                Util.nLogN(rowCount) * bytesPerRow, rowCount, 0);
+    }
+
+    @Override 
+    public double getRows() {
+        double rows = super.getRows();        
+        if (this.statelessFetch == null)
+            return rows;
+
+        return Math.min(this.statelessFetch, rows);
     }
     
     protected OrderBy getOrderBy(Implementor implementor, TupleProjector tupleProjector) {
-        assert !getCollation().getFieldCollations().isEmpty();
-        
         List<OrderByExpression> orderByExpressions = Lists.newArrayList();
         for (RelFieldCollation fieldCollation : getCollation().getFieldCollations()) {
             Expression expr = tupleProjector == null ? 
@@ -57,12 +81,9 @@ abstract public class PhoenixAbstractSort extends Sort implements PhoenixRel {
         if (this.fetch == null)
             return null;
         
-        Expression expr = CalciteUtils.toExpression(this.fetch, implementor);
-        if (!expr.isStateless())
+        if (this.statelessFetch == null)
             throw new UnsupportedOperationException("Stateful limit expression not supported");
 
-        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-        expr.evaluate(null, ptr);
-        return ((Number) (expr.getDataType().toObject(ptr))).intValue();
+        return this.statelessFetch;
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
index 4c7c6b9..c77e66e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
@@ -12,8 +12,6 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
 import org.apache.phoenix.compile.QueryPlan;
 
-import com.google.common.collect.ImmutableSet;
-
 public class PhoenixClientJoin extends PhoenixAbstractJoin {
 
     public PhoenixClientJoin(RelOptCluster cluster, RelTraitSet traits,
@@ -26,7 +24,7 @@ public class PhoenixClientJoin extends PhoenixAbstractJoin {
     @Override
     public PhoenixClientJoin copy(RelTraitSet traits, RexNode condition, RelNode left,
             RelNode right, JoinRelType joinRelType, boolean semiJoinDone) {
-        return new PhoenixClientJoin(getCluster(), traits, left, right, condition, joinRelType, ImmutableSet.<String>of());
+        return new PhoenixClientJoin(getCluster(), traits, left, right, condition, joinRelType, variablesStopped);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java
new file mode 100644
index 0000000..10f5518
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java
@@ -0,0 +1,66 @@
+package org.apache.phoenix.calcite.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.phoenix.calcite.CalciteUtils;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.ClientScanPlan;
+
+public class PhoenixLimit extends Sort implements PhoenixRel {
+    public final Integer statelessFetch;
+
+    public PhoenixLimit(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation, RexNode offset, RexNode fetch) {
+        super(cluster, traits, input, collation, offset, fetch);
+        Object value = fetch == null ? null : CalciteUtils.evaluateStatelessExpression(fetch);
+        this.statelessFetch = value == null ? null : ((Number) value).intValue();        
+        assert getConvention() == PhoenixRel.CONVENTION;
+        assert getCollation().getFieldCollations().isEmpty();
+    }
+
+    @Override
+    public PhoenixLimit copy(RelTraitSet traitSet, RelNode newInput,
+            RelCollation newCollation, RexNode offset, RexNode fetch) {
+        return new PhoenixLimit(getCluster(), traitSet, newInput, newCollation, offset, fetch);
+    }
+
+    @Override 
+    public RelOptCost computeSelfCost(RelOptPlanner planner) {
+        double rowCount = RelMetadataQuery.getRowCount(this);
+        return planner.getCostFactory()
+                .makeCost(rowCount, 0, 0)
+                .multiplyBy(PHOENIX_FACTOR);
+    }
+    
+    @Override 
+    public double getRows() {
+        double rows = super.getRows();        
+        // TODO Should we apply a factor to ensure that a limit can be propagated to
+        // lower nodes as much as possible?
+        if (this.statelessFetch == null)
+            return rows;
+
+        return Math.min(this.statelessFetch, rows);
+    }
+
+    @Override
+    public QueryPlan implement(Implementor implementor) {
+        assert getConvention() == getInput().getConvention();
+        
+        QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+        // TODO only wrap with ClientScanPlan 
+        // if (plan.getLimit() != null);
+        // otherwise add limit to "plan"
+        return new ClientScanPlan(plan.getContext(), plan.getStatement(), 
+                implementor.getTableRef(), RowProjector.EMPTY_PROJECTOR, 
+                statelessFetch, null, OrderBy.EMPTY_ORDER_BY, plan);
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
index c7cc60d..d19f0b4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
@@ -4,6 +4,8 @@ import java.util.List;
 
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.phoenix.calcite.metadata.PhoenixRelMetadataProvider;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.execute.TupleProjector;
@@ -21,6 +23,9 @@ import org.apache.phoenix.schema.TableRef;
 public interface PhoenixRel extends RelNode {
   /** Calling convention for relational operations that occur in Phoenix. */
   Convention CONVENTION = new Convention.Impl("PHOENIX", PhoenixRel.class);
+  
+  /** Metadata Provider for PhoenixRel */
+  RelMetadataProvider METADATA_PROVIDER = new PhoenixRelMetadataProvider();
 
   /** Relative cost of Phoenix versus Enumerable convention.
    *

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
index 8a4811a..b937df9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
@@ -28,7 +28,6 @@ import org.apache.phoenix.parse.JoinTableNode.JoinType;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
 
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 
 public class PhoenixServerJoin extends PhoenixAbstractJoin {
@@ -43,7 +42,7 @@ public class PhoenixServerJoin extends PhoenixAbstractJoin {
     @Override
     public PhoenixServerJoin copy(RelTraitSet traits, RexNode condition, RelNode left,
             RelNode right, JoinRelType joinRelType, boolean semiJoinDone) {
-        return new PhoenixServerJoin(getCluster(), traits, left, right, condition, joinRelType, ImmutableSet.<String>of());
+        return new PhoenixServerJoin(getCluster(), traits, left, right, condition, joinRelType, variablesStopped);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
index 7902e27..b9bea64 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
@@ -18,6 +18,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.calcite.CalciteUtils;
 import org.apache.phoenix.calcite.PhoenixTable;
+import org.apache.phoenix.calcite.rules.PhoenixAddScanLimitRule;
 import org.apache.phoenix.calcite.rules.PhoenixClientJoinRule;
 import org.apache.phoenix.calcite.rules.PhoenixCompactClientSortRule;
 import org.apache.phoenix.calcite.rules.PhoenixFilterScanMergeRule;
@@ -55,10 +56,17 @@ import com.google.common.collect.Lists;
  */
 public class PhoenixTableScan extends TableScan implements PhoenixRel {
     public final RexNode filter;
+    
+    /**
+     * This will not make a difference in implement(), but rather give a more accurate
+     * estimate of the row count.
+     */
+    public final Integer statelessFetch;
 
-    public PhoenixTableScan(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, RexNode filter) {
+    public PhoenixTableScan(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, RexNode filter, Integer statelessFetch) {
         super(cluster, traits, table);
         this.filter = filter;
+        this.statelessFetch = statelessFetch;
     }
 
     @Override
@@ -74,6 +82,8 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel {
             planner.addRule(rule);
         }
         planner.addRule(PhoenixFilterScanMergeRule.INSTANCE);
+        planner.addRule(PhoenixAddScanLimitRule.LIMIT_SCAN);
+        planner.addRule(PhoenixAddScanLimitRule.LIMIT_SERVERPROJECT_SCAN);
         planner.addRule(PhoenixServerProjectRule.PROJECT_SCAN);
         planner.addRule(PhoenixServerProjectRule.PROJECT_SERVERJOIN);
         planner.addRule(PhoenixServerJoinRule.JOIN_SCAN);
@@ -91,23 +101,28 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel {
     @Override
     public RelWriter explainTerms(RelWriter pw) {
         return super.explainTerms(pw)
-            .itemIf("filter", filter, filter != null);
+            .itemIf("filter", filter, filter != null)
+            .itemIf("statelessFetch", statelessFetch, statelessFetch != null);
     }
 
     @Override
     public RelOptCost computeSelfCost(RelOptPlanner planner) {
-        RelOptCost cost = super.computeSelfCost(planner).multiplyBy(PHOENIX_FACTOR);
-        if (filter != null && !filter.isAlwaysTrue()) {
-            final Double selectivity = RelMetadataQuery.getSelectivity(this, filter);
-            cost = cost.multiplyBy(selectivity);
-        }
-        return cost;
+        double rowCount = RelMetadataQuery.getRowCount(this);
+        return planner.getCostFactory()
+                .makeCost(rowCount, rowCount + 1, 0)
+                .multiplyBy(PHOENIX_FACTOR);
     }
     
     @Override
     public double getRows() {
-        return super.getRows()
-                * RelMetadataQuery.getSelectivity(this, filter);
+        double rows = super.getRows();
+        if (filter != null && !filter.isAlwaysTrue()) {
+            rows = rows * RelMetadataQuery.getSelectivity(this, filter);
+        }        
+        if (statelessFetch == null)
+            return rows;
+        
+        return Math.min(statelessFetch, rows);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixAddScanLimitRule.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixAddScanLimitRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixAddScanLimitRule.java
new file mode 100644
index 0000000..5770f26
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixAddScanLimitRule.java
@@ -0,0 +1,74 @@
+package org.apache.phoenix.calcite.rules;
+
+import java.util.Collections;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.RelNode;
+import org.apache.phoenix.calcite.rel.PhoenixLimit;
+import org.apache.phoenix.calcite.rel.PhoenixRel;
+import org.apache.phoenix.calcite.rel.PhoenixServerProject;
+import org.apache.phoenix.calcite.rel.PhoenixTableScan;
+
+import com.google.common.base.Predicate;
+
+public class PhoenixAddScanLimitRule extends RelOptRule {
+
+    /** Predicate that returns true if a limit's fetch is stateless. */
+    private static final Predicate<PhoenixLimit> IS_FETCH_STATELESS =
+        new Predicate<PhoenixLimit>() {
+            @Override
+            public boolean apply(PhoenixLimit phoenixLimit) {
+                return phoenixLimit.statelessFetch != null;
+            }
+        };
+
+    /** Predicate that returns true if a table scan has no stateless fetch. */
+    private static final Predicate<PhoenixTableScan> NO_STATELESSFETCH =
+        new Predicate<PhoenixTableScan>() {
+            @Override
+            public boolean apply(PhoenixTableScan phoenixTableScan) {
+                return phoenixTableScan.statelessFetch == null;
+            }
+        };
+
+    public static final PhoenixAddScanLimitRule LIMIT_SCAN = 
+            new PhoenixAddScanLimitRule(
+                    "PhoenixAddScanLimitRule:limit_scan", 
+                    operand(PhoenixTableScan.class, null, NO_STATELESSFETCH, any()));
+
+    public static final PhoenixAddScanLimitRule LIMIT_SERVERPROJECT_SCAN = 
+            new PhoenixAddScanLimitRule(
+                    "PhoenixAddScanLimitRule:limit_serverproject_scan", 
+                    operand(PhoenixServerProject.class, 
+                            operand(PhoenixTableScan.class, null, NO_STATELESSFETCH, any())));
+
+    private PhoenixAddScanLimitRule(String description, RelOptRuleOperand input) {
+        super(
+            operand(PhoenixLimit.class, null, IS_FETCH_STATELESS, input), description);
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        int relCount = call.getRelList().size();
+        PhoenixLimit limit = call.rel(0);
+        PhoenixServerProject project = null;
+        if (relCount > 2) {
+            project = call.rel(1);
+        }
+        PhoenixTableScan scan = call.rel(relCount - 1);
+        assert limit.statelessFetch != null : "predicate should have ensured fetch is stateless";
+        assert scan.statelessFetch == null : "predicate should have ensured table scan has no stateless fetch";
+        PhoenixTableScan newScan = new PhoenixTableScan(
+                scan.getCluster(), scan.getTraitSet(), scan.getTable(),
+                scan.filter, limit.statelessFetch);
+        PhoenixRel newInput = project == null ? 
+                  newScan 
+                : project.copy(project.getTraitSet(), newScan, 
+                        project.getProjects(), project.getRowType());
+        call.transformTo(limit.copy(limit.getTraitSet(), 
+                Collections.<RelNode>singletonList(newInput)));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
index c6a5d36..2b7b870 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
@@ -11,15 +11,13 @@ import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.logical.LogicalSort;
 import org.apache.calcite.rel.logical.LogicalUnion;
 import org.apache.calcite.util.trace.CalciteTrace;
-import org.apache.phoenix.calcite.rel.PhoenixAbstractAggregate;
 import org.apache.phoenix.calcite.rel.PhoenixClientAggregate;
 import org.apache.phoenix.calcite.rel.PhoenixClientProject;
 import org.apache.phoenix.calcite.rel.PhoenixClientSort;
 import org.apache.phoenix.calcite.rel.PhoenixFilter;
-import org.apache.phoenix.calcite.rel.PhoenixAbstractProject;
 import org.apache.phoenix.calcite.rel.PhoenixJoin;
+import org.apache.phoenix.calcite.rel.PhoenixLimit;
 import org.apache.phoenix.calcite.rel.PhoenixRel;
-import org.apache.phoenix.calcite.rel.PhoenixAbstractSort;
 import org.apache.phoenix.calcite.rel.PhoenixToEnumerableConverter;
 import org.apache.phoenix.calcite.rel.PhoenixUnion;
 
@@ -40,6 +38,7 @@ public class PhoenixConverterRules {
     public static final RelOptRule[] RULES = {
         PhoenixToEnumerableConverterRule.INSTANCE,
         PhoenixSortRule.INSTANCE,
+        PhoenixLimitRule.INSTANCE,
         PhoenixFilterRule.INSTANCE,
         PhoenixProjectRule.INSTANCE,
         PhoenixAggregateRule.INSTANCE,
@@ -73,7 +72,7 @@ public class PhoenixConverterRules {
 
     /**
      * Rule to convert a {@link org.apache.calcite.rel.core.Sort} to a
-     * {@link PhoenixAbstractSort}.
+     * {@link PhoenixClientSort}.
      */
     private static class PhoenixSortRule extends PhoenixConverterRule {
         private static Predicate<LogicalSort> NON_EMPTY_COLLATION = new Predicate<LogicalSort>() {
@@ -102,6 +101,35 @@ public class PhoenixConverterRules {
     }
 
     /**
+     * Rule to convert a {@link org.apache.calcite.rel.core.Sort} to a
+     * {@link PhoenixLimit}.
+     */
+    private static class PhoenixLimitRule extends PhoenixConverterRule {
+        private static Predicate<LogicalSort> EMPTY_COLLATION = new Predicate<LogicalSort>() {
+            @Override
+            public boolean apply(LogicalSort input) {
+                return input.getCollation().getFieldCollations().isEmpty();
+            }            
+        };
+        
+        public static final PhoenixLimitRule INSTANCE = new PhoenixLimitRule();
+
+        private PhoenixLimitRule() {
+            super(LogicalSort.class, EMPTY_COLLATION, Convention.NONE, PhoenixRel.CONVENTION,
+                "PhoenixLimitRule");
+        }
+
+        public RelNode convert(RelNode rel) {
+            final LogicalSort sort = (LogicalSort) rel;
+            final RelTraitSet traitSet =
+                sort.getTraitSet().replace(out);
+            return new PhoenixLimit(rel.getCluster(), traitSet,
+                convert(sort.getInput(), sort.getInput().getTraitSet().replace(out)),
+                sort.getCollation(), sort.offset, sort.fetch);
+        }
+    }
+
+    /**
      * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalFilter} to a
      * {@link PhoenixFilter}.
      */
@@ -126,7 +154,7 @@ public class PhoenixConverterRules {
 
     /**
      * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalProject}
-     * to a {@link PhoenixAbstractProject}.
+     * to a {@link PhoenixClientProject}.
      */
     private static class PhoenixProjectRule extends PhoenixConverterRule {
         private static final PhoenixProjectRule INSTANCE = new PhoenixProjectRule();
@@ -147,7 +175,7 @@ public class PhoenixConverterRules {
 
     /**
      * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalAggregate}
-     * to an {@link PhoenixAbstractAggregate}.
+     * to an {@link PhoenixClientAggregate}.
      */
     private static class PhoenixAggregateRule extends PhoenixConverterRule {
         public static final RelOptRule INSTANCE = new PhoenixAggregateRule();
@@ -193,8 +221,8 @@ public class PhoenixConverterRules {
     }
 
     /**
-     * Rule to convert a {@link org.apache.calcite.rel.core.Sort} to a
-     * {@link PhoenixAbstractSort}.
+     * Rule to convert a {@link org.apache.calcite.rel.core.Join} to a
+     * {@link PhoenixJoin}.
      */
     private static class PhoenixJoinRule extends PhoenixConverterRule {
         public static final PhoenixJoinRule INSTANCE = new PhoenixJoinRule();
@@ -376,6 +404,8 @@ public class PhoenixConverterRules {
 
         @Override public RelNode convert(RelNode rel) {
             RelTraitSet newTraitSet = rel.getTraitSet().replace(getOutConvention());
+            // TODO Is there a better place to do this?
+            rel.getCluster().setMetadataProvider(PhoenixRel.METADATA_PROVIDER);
             return new PhoenixToEnumerableConverter(rel.getCluster(), newTraitSet, rel);
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java
index dd0f119..9a992b3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java
@@ -4,7 +4,7 @@ import com.google.common.base.Predicate;
 
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.rel.core.Filter;
+import org.apache.phoenix.calcite.rel.PhoenixFilter;
 import org.apache.phoenix.calcite.rel.PhoenixTableScan;
 
 public class PhoenixFilterScanMergeRule extends RelOptRule {
@@ -22,17 +22,17 @@ public class PhoenixFilterScanMergeRule extends RelOptRule {
 
     private PhoenixFilterScanMergeRule() {
         super(
-            operand(Filter.class,
+            operand(PhoenixFilter.class,
                 operand(PhoenixTableScan.class, null, NO_FILTER, any())));
     }
 
     @Override
     public void onMatch(RelOptRuleCall call) {
-        Filter filter = call.rel(0);
+        PhoenixFilter filter = call.rel(0);
         PhoenixTableScan scan = call.rel(1);
         assert scan.filter == null : "predicate should have ensured no filter";
         call.transformTo(new PhoenixTableScan(scan.getCluster(),
                 scan.getTraitSet(), scan.getTable(),
-                filter.getCondition()));
+                filter.getCondition(), scan.statelessFetch));
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index cc2a244..14c47c4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -117,11 +117,6 @@ public class HashJoinPlan extends DelegateQueryPlan {
     public SubPlan[] getSubPlans() {
         return this.subPlans;
     }
-    
-    @Override
-    public Integer getLimit() {
-        return this.joinInfo == null ? null : this.joinInfo.getLimit();
-    }
 
     @Override
     public ResultIterator iterator() throws SQLException {