You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/04/16 04:26:48 UTC
phoenix git commit: PHOENIX-1859 Implement PhoenixLimit in
Phoenix/Calcite Integration
Repository: phoenix
Updated Branches:
refs/heads/calcite a0aca7f51 -> 2368ea6d3
PHOENIX-1859 Implement PhoenixLimit in Phoenix/Calcite Integration
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2368ea6d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2368ea6d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2368ea6d
Branch: refs/heads/calcite
Commit: 2368ea6d3a8375a35c492d62c6383e9a9eca9ee6
Parents: a0aca7f
Author: maryannxue <we...@intel.com>
Authored: Wed Apr 15 22:26:34 2015 -0400
Committer: maryannxue <we...@intel.com>
Committed: Wed Apr 15 22:26:34 2015 -0400
----------------------------------------------------------------------
.../org/apache/phoenix/calcite/CalciteTest.java | 58 +++++++++++++++
.../apache/phoenix/calcite/CalciteUtils.java | 15 ++++
.../apache/phoenix/calcite/PhoenixTable.java | 2 +-
.../calcite/metadata/PhoenixRelMdRowCount.java | 21 ++++++
.../metadata/PhoenixRelMetadataProvider.java | 15 ++++
.../calcite/rel/PhoenixAbstractSort.java | 37 +++++++---
.../phoenix/calcite/rel/PhoenixClientJoin.java | 4 +-
.../phoenix/calcite/rel/PhoenixLimit.java | 66 +++++++++++++++++
.../apache/phoenix/calcite/rel/PhoenixRel.java | 5 ++
.../phoenix/calcite/rel/PhoenixServerJoin.java | 3 +-
.../phoenix/calcite/rel/PhoenixTableScan.java | 35 ++++++---
.../calcite/rules/PhoenixAddScanLimitRule.java | 74 ++++++++++++++++++++
.../calcite/rules/PhoenixConverterRules.java | 46 +++++++++---
.../rules/PhoenixFilterScanMergeRule.java | 8 +--
.../apache/phoenix/execute/HashJoinPlan.java | 5 --
15 files changed, 353 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
index 6d980e1..05dcc9e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
@@ -517,6 +517,64 @@ public class CalciteTest extends BaseClientManagedTimeIT {
.close();
}
+ @Test public void testLimit() {
+ start().sql("select organization_id, entity_id, a_string from aTable limit 5")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixLimit(fetch=[5])\n" +
+ " PhoenixServerProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[$2])\n" +
+ " PhoenixTableScan(table=[[phoenix, ATABLE]], statelessFetch=[5])\n")
+ .resultIs(new Object[][] {
+ {"00D300000000XHP", "00A123122312312", "a"},
+ {"00D300000000XHP", "00A223122312312", "a"},
+ {"00D300000000XHP", "00A323122312312", "a"},
+ {"00D300000000XHP", "00A423122312312", "a"},
+ {"00D300000000XHP", "00B523122312312", "b"}})
+ .close();
+
+ start().sql("select count(entity_id), a_string from atable group by a_string limit 2")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixClientProject(EXPR$0=[$1], A_STRING=[$0])\n" +
+ " PhoenixLimit(fetch=[2])\n" +
+ " PhoenixServerAggregate(group=[{0}], EXPR$0=[COUNT()])\n" +
+ " PhoenixServerProject(A_STRING=[$2])\n" +
+ " PhoenixTableScan(table=[[phoenix, ATABLE]])\n")
+ .resultIs(new Object[][] {
+ {4L, "a"},
+ {4L, "b"}})
+ .close();
+
+ start().sql("select s.name, count(\"item_id\") from " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s join " + JOIN_ITEM_TABLE_FULL_NAME + " i on s.\"supplier_id\" = i.\"supplier_id\" group by s.name limit 3")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixLimit(fetch=[3])\n" +
+ " PhoenixServerAggregate(group=[{0}], EXPR$1=[COUNT()])\n" +
+ " PhoenixServerProject(NAME=[$2])\n" +
+ " PhoenixServerJoin(condition=[=($1, $0)], joinType=[inner])\n" +
+ " PhoenixServerProject(supplier_id=[$5])\n" +
+ " PhoenixTableScan(table=[[phoenix, ITEMTABLE]])\n" +
+ " PhoenixServerProject(supplier_id=[$0], NAME=[$1])\n" +
+ " PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]])\n")
+ .resultIs(new Object[][] {
+ {"S1", 2L},
+ {"S2", 2L},
+ {"S5", 1L}})
+ .close();
+
+ start().sql("SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" limit 3")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixClientProject(item_id=[$0], NAME=[$1], supplier_id=[$3], NAME0=[$4])\n" +
+ " PhoenixLimit(fetch=[3])\n" +
+ " PhoenixServerJoin(condition=[=($2, $3)], joinType=[inner])\n" +
+ " PhoenixServerProject(item_id=[$0], NAME=[$1], supplier_id=[$5])\n" +
+ " PhoenixTableScan(table=[[phoenix, ITEMTABLE]])\n" +
+ " PhoenixServerProject(supplier_id=[$0], NAME=[$1])\n" +
+ " PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]])\n")
+ .resultIs(new Object[][] {
+ {"0000000001", "T1", "0000000001", "S1"},
+ {"0000000002", "T2", "0000000001", "S1"},
+ {"0000000003", "T3", "0000000002", "S2"}})
+ .close();
+ }
+
@Test public void testSubquery() {
start().sql("SELECT \"order_id\", quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE quantity = (SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = q.\"item_id\")")
.explainIs("PhoenixToEnumerableConverter\n" +
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
index 5c4d7ab..a3fbce8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
@@ -155,6 +155,21 @@ public class CalciteUtils {
return (AggregateFunction) (fFactory.newFunction(aggFunc, exprs));
}
+ public static Object evaluateStatelessExpression(RexNode node) {
+ try {
+ Expression expression = toExpression(node, null);
+ if (expression.isStateless()) {
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ expression.evaluate(null, ptr);
+ return expression.getDataType().toObject(ptr);
+ }
+ } catch (Exception e) {
+ // Expression is not stateless. do nothing.
+ }
+
+ return null;
+ }
+
public static interface ExpressionFactory {
public Expression newExpression(RexNode node, Implementor implementor);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
index 1e74d6c..e47521b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
@@ -72,7 +72,7 @@ public class PhoenixTable extends AbstractTable implements TranslatableTable {
@Override
public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
final RelOptCluster cluster = context.getCluster();
- return new PhoenixTableScan(cluster, cluster.traitSetOf(PhoenixRel.CONVENTION), relOptTable, null);
+ return new PhoenixTableScan(cluster, cluster.traitSetOf(PhoenixRel.CONVENTION), relOptTable, null, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdRowCount.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdRowCount.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdRowCount.java
new file mode 100644
index 0000000..797867d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMdRowCount.java
@@ -0,0 +1,21 @@
+package org.apache.phoenix.calcite.metadata;
+
+import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.phoenix.calcite.rel.PhoenixAbstractSort;
+import org.apache.phoenix.calcite.rel.PhoenixLimit;
+
+public class PhoenixRelMdRowCount {
+ public static final RelMetadataProvider SOURCE =
+ ReflectiveRelMetadataProvider.reflectiveSource(
+ BuiltInMethod.ROW_COUNT.method, new PhoenixRelMdRowCount());
+
+ public Double getRowCount(PhoenixAbstractSort rel) {
+ return rel.getRows();
+ }
+
+ public Double getRowCount(PhoenixLimit rel) {
+ return rel.getRows();
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMetadataProvider.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMetadataProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMetadataProvider.java
new file mode 100644
index 0000000..ea37251
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/metadata/PhoenixRelMetadataProvider.java
@@ -0,0 +1,15 @@
+package org.apache.phoenix.calcite.metadata;
+
+import org.apache.calcite.rel.metadata.ChainedRelMetadataProvider;
+import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider;
+import com.google.common.collect.ImmutableList;
+
+public class PhoenixRelMetadataProvider extends ChainedRelMetadataProvider {
+
+ public PhoenixRelMetadataProvider() {
+ super(ImmutableList.of(
+ PhoenixRelMdRowCount.SOURCE,
+ new DefaultRelMetadataProvider()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
index 708b5ae..4598bf5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
@@ -3,6 +3,8 @@ package org.apache.phoenix.calcite.rel;
import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelFieldCollation;
@@ -10,8 +12,9 @@ import org.apache.calcite.rel.RelFieldCollation.Direction;
import org.apache.calcite.rel.RelFieldCollation.NullDirection;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexNode;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.calcite.util.Util;
import org.apache.phoenix.calcite.CalciteUtils;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.execute.TupleProjector;
@@ -30,14 +33,35 @@ import com.google.common.collect.Lists;
abstract public class PhoenixAbstractSort extends Sort implements PhoenixRel {
protected static final double CLIENT_MERGE_FACTOR = 0.5;
+ private final Integer statelessFetch;
+
public PhoenixAbstractSort(RelOptCluster cluster, RelTraitSet traits, RelNode child, RelCollation collation, RexNode offset, RexNode fetch) {
super(cluster, traits, child, collation, offset, fetch);
+ Object value = fetch == null ? null : CalciteUtils.evaluateStatelessExpression(fetch);
+ this.statelessFetch = value == null ? null : ((Number) value).intValue();
assert getConvention() == PhoenixRel.CONVENTION;
+ assert !getCollation().getFieldCollations().isEmpty();
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ // Fix rowCount for super class's computeSelfCost() with input's row count.
+ double rowCount = RelMetadataQuery.getRowCount(getInput());
+ double bytesPerRow = getRowType().getFieldCount() * 4;
+ return planner.getCostFactory().makeCost(
+ Util.nLogN(rowCount) * bytesPerRow, rowCount, 0);
+ }
+
+ @Override
+ public double getRows() {
+ double rows = super.getRows();
+ if (this.statelessFetch == null)
+ return rows;
+
+ return Math.min(this.statelessFetch, rows);
}
protected OrderBy getOrderBy(Implementor implementor, TupleProjector tupleProjector) {
- assert !getCollation().getFieldCollations().isEmpty();
-
List<OrderByExpression> orderByExpressions = Lists.newArrayList();
for (RelFieldCollation fieldCollation : getCollation().getFieldCollations()) {
Expression expr = tupleProjector == null ?
@@ -57,12 +81,9 @@ abstract public class PhoenixAbstractSort extends Sort implements PhoenixRel {
if (this.fetch == null)
return null;
- Expression expr = CalciteUtils.toExpression(this.fetch, implementor);
- if (!expr.isStateless())
+ if (this.statelessFetch == null)
throw new UnsupportedOperationException("Stateful limit expression not supported");
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- expr.evaluate(null, ptr);
- return ((Number) (expr.getDataType().toObject(ptr))).intValue();
+ return this.statelessFetch;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
index 4c7c6b9..c77e66e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
@@ -12,8 +12,6 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexNode;
import org.apache.phoenix.compile.QueryPlan;
-import com.google.common.collect.ImmutableSet;
-
public class PhoenixClientJoin extends PhoenixAbstractJoin {
public PhoenixClientJoin(RelOptCluster cluster, RelTraitSet traits,
@@ -26,7 +24,7 @@ public class PhoenixClientJoin extends PhoenixAbstractJoin {
@Override
public PhoenixClientJoin copy(RelTraitSet traits, RexNode condition, RelNode left,
RelNode right, JoinRelType joinRelType, boolean semiJoinDone) {
- return new PhoenixClientJoin(getCluster(), traits, left, right, condition, joinRelType, ImmutableSet.<String>of());
+ return new PhoenixClientJoin(getCluster(), traits, left, right, condition, joinRelType, variablesStopped);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java
new file mode 100644
index 0000000..10f5518
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java
@@ -0,0 +1,66 @@
+package org.apache.phoenix.calcite.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.phoenix.calcite.CalciteUtils;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.ClientScanPlan;
+
+public class PhoenixLimit extends Sort implements PhoenixRel {
+ public final Integer statelessFetch;
+
+ public PhoenixLimit(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation, RexNode offset, RexNode fetch) {
+ super(cluster, traits, input, collation, offset, fetch);
+ Object value = fetch == null ? null : CalciteUtils.evaluateStatelessExpression(fetch);
+ this.statelessFetch = value == null ? null : ((Number) value).intValue();
+ assert getConvention() == PhoenixRel.CONVENTION;
+ assert getCollation().getFieldCollations().isEmpty();
+ }
+
+ @Override
+ public PhoenixLimit copy(RelTraitSet traitSet, RelNode newInput,
+ RelCollation newCollation, RexNode offset, RexNode fetch) {
+ return new PhoenixLimit(getCluster(), traitSet, newInput, newCollation, offset, fetch);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ double rowCount = RelMetadataQuery.getRowCount(this);
+ return planner.getCostFactory()
+ .makeCost(rowCount, 0, 0)
+ .multiplyBy(PHOENIX_FACTOR);
+ }
+
+ @Override
+ public double getRows() {
+ double rows = super.getRows();
+ // TODO Should we apply a factor to ensure that a limit can be propagated to
+ // lower nodes as much as possible?
+ if (this.statelessFetch == null)
+ return rows;
+
+ return Math.min(this.statelessFetch, rows);
+ }
+
+ @Override
+ public QueryPlan implement(Implementor implementor) {
+ assert getConvention() == getInput().getConvention();
+
+ QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+ // TODO only wrap with ClientScanPlan
+ // if (plan.getLimit() != null);
+ // otherwise add limit to "plan"
+ return new ClientScanPlan(plan.getContext(), plan.getStatement(),
+ implementor.getTableRef(), RowProjector.EMPTY_PROJECTOR,
+ statelessFetch, null, OrderBy.EMPTY_ORDER_BY, plan);
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
index c7cc60d..d19f0b4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
@@ -4,6 +4,8 @@ import java.util.List;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.phoenix.calcite.metadata.PhoenixRelMetadataProvider;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.execute.TupleProjector;
@@ -21,6 +23,9 @@ import org.apache.phoenix.schema.TableRef;
public interface PhoenixRel extends RelNode {
/** Calling convention for relational operations that occur in Phoenix. */
Convention CONVENTION = new Convention.Impl("PHOENIX", PhoenixRel.class);
+
+ /** Metadata Provider for PhoenixRel */
+ RelMetadataProvider METADATA_PROVIDER = new PhoenixRelMetadataProvider();
/** Relative cost of Phoenix versus Enumerable convention.
*
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
index 8a4811a..b937df9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
@@ -28,7 +28,6 @@ import org.apache.phoenix.parse.JoinTableNode.JoinType;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.TableRef;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
public class PhoenixServerJoin extends PhoenixAbstractJoin {
@@ -43,7 +42,7 @@ public class PhoenixServerJoin extends PhoenixAbstractJoin {
@Override
public PhoenixServerJoin copy(RelTraitSet traits, RexNode condition, RelNode left,
RelNode right, JoinRelType joinRelType, boolean semiJoinDone) {
- return new PhoenixServerJoin(getCluster(), traits, left, right, condition, joinRelType, ImmutableSet.<String>of());
+ return new PhoenixServerJoin(getCluster(), traits, left, right, condition, joinRelType, variablesStopped);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
index 7902e27..b9bea64 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
@@ -18,6 +18,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.calcite.CalciteUtils;
import org.apache.phoenix.calcite.PhoenixTable;
+import org.apache.phoenix.calcite.rules.PhoenixAddScanLimitRule;
import org.apache.phoenix.calcite.rules.PhoenixClientJoinRule;
import org.apache.phoenix.calcite.rules.PhoenixCompactClientSortRule;
import org.apache.phoenix.calcite.rules.PhoenixFilterScanMergeRule;
@@ -55,10 +56,17 @@ import com.google.common.collect.Lists;
*/
public class PhoenixTableScan extends TableScan implements PhoenixRel {
public final RexNode filter;
+
+ /**
+ * This will not make a difference in implement(), but rather give a more accurate
+ * estimate of the row count.
+ */
+ public final Integer statelessFetch;
- public PhoenixTableScan(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, RexNode filter) {
+ public PhoenixTableScan(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, RexNode filter, Integer statelessFetch) {
super(cluster, traits, table);
this.filter = filter;
+ this.statelessFetch = statelessFetch;
}
@Override
@@ -74,6 +82,8 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel {
planner.addRule(rule);
}
planner.addRule(PhoenixFilterScanMergeRule.INSTANCE);
+ planner.addRule(PhoenixAddScanLimitRule.LIMIT_SCAN);
+ planner.addRule(PhoenixAddScanLimitRule.LIMIT_SERVERPROJECT_SCAN);
planner.addRule(PhoenixServerProjectRule.PROJECT_SCAN);
planner.addRule(PhoenixServerProjectRule.PROJECT_SERVERJOIN);
planner.addRule(PhoenixServerJoinRule.JOIN_SCAN);
@@ -91,23 +101,28 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel {
@Override
public RelWriter explainTerms(RelWriter pw) {
return super.explainTerms(pw)
- .itemIf("filter", filter, filter != null);
+ .itemIf("filter", filter, filter != null)
+ .itemIf("statelessFetch", statelessFetch, statelessFetch != null);
}
@Override
public RelOptCost computeSelfCost(RelOptPlanner planner) {
- RelOptCost cost = super.computeSelfCost(planner).multiplyBy(PHOENIX_FACTOR);
- if (filter != null && !filter.isAlwaysTrue()) {
- final Double selectivity = RelMetadataQuery.getSelectivity(this, filter);
- cost = cost.multiplyBy(selectivity);
- }
- return cost;
+ double rowCount = RelMetadataQuery.getRowCount(this);
+ return planner.getCostFactory()
+ .makeCost(rowCount, rowCount + 1, 0)
+ .multiplyBy(PHOENIX_FACTOR);
}
@Override
public double getRows() {
- return super.getRows()
- * RelMetadataQuery.getSelectivity(this, filter);
+ double rows = super.getRows();
+ if (filter != null && !filter.isAlwaysTrue()) {
+ rows = rows * RelMetadataQuery.getSelectivity(this, filter);
+ }
+ if (statelessFetch == null)
+ return rows;
+
+ return Math.min(statelessFetch, rows);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixAddScanLimitRule.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixAddScanLimitRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixAddScanLimitRule.java
new file mode 100644
index 0000000..5770f26
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixAddScanLimitRule.java
@@ -0,0 +1,74 @@
+package org.apache.phoenix.calcite.rules;
+
+import java.util.Collections;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.RelNode;
+import org.apache.phoenix.calcite.rel.PhoenixLimit;
+import org.apache.phoenix.calcite.rel.PhoenixRel;
+import org.apache.phoenix.calcite.rel.PhoenixServerProject;
+import org.apache.phoenix.calcite.rel.PhoenixTableScan;
+
+import com.google.common.base.Predicate;
+
+public class PhoenixAddScanLimitRule extends RelOptRule {
+
+ /** Predicate that returns true if a limit's fetch is stateless. */
+ private static final Predicate<PhoenixLimit> IS_FETCH_STATELESS =
+ new Predicate<PhoenixLimit>() {
+ @Override
+ public boolean apply(PhoenixLimit phoenixLimit) {
+ return phoenixLimit.statelessFetch != null;
+ }
+ };
+
+ /** Predicate that returns true if a table scan has no stateless fetch. */
+ private static final Predicate<PhoenixTableScan> NO_STATELESSFETCH =
+ new Predicate<PhoenixTableScan>() {
+ @Override
+ public boolean apply(PhoenixTableScan phoenixTableScan) {
+ return phoenixTableScan.statelessFetch == null;
+ }
+ };
+
+ public static final PhoenixAddScanLimitRule LIMIT_SCAN =
+ new PhoenixAddScanLimitRule(
+ "PhoenixAddScanLimitRule:limit_scan",
+ operand(PhoenixTableScan.class, null, NO_STATELESSFETCH, any()));
+
+ public static final PhoenixAddScanLimitRule LIMIT_SERVERPROJECT_SCAN =
+ new PhoenixAddScanLimitRule(
+ "PhoenixAddScanLimitRule:limit_serverproject_scan",
+ operand(PhoenixServerProject.class,
+ operand(PhoenixTableScan.class, null, NO_STATELESSFETCH, any())));
+
+ private PhoenixAddScanLimitRule(String description, RelOptRuleOperand input) {
+ super(
+ operand(PhoenixLimit.class, null, IS_FETCH_STATELESS, input), description);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ int relCount = call.getRelList().size();
+ PhoenixLimit limit = call.rel(0);
+ PhoenixServerProject project = null;
+ if (relCount > 2) {
+ project = call.rel(1);
+ }
+ PhoenixTableScan scan = call.rel(relCount - 1);
+ assert limit.statelessFetch != null : "predicate should have ensured fetch is stateless";
+ assert scan.statelessFetch == null : "predicate should have ensured table scan has no stateless fetch";
+ PhoenixTableScan newScan = new PhoenixTableScan(
+ scan.getCluster(), scan.getTraitSet(), scan.getTable(),
+ scan.filter, limit.statelessFetch);
+ PhoenixRel newInput = project == null ?
+ newScan
+ : project.copy(project.getTraitSet(), newScan,
+ project.getProjects(), project.getRowType());
+ call.transformTo(limit.copy(limit.getTraitSet(),
+ Collections.<RelNode>singletonList(newInput)));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
index c6a5d36..2b7b870 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
@@ -11,15 +11,13 @@ import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.logical.LogicalUnion;
import org.apache.calcite.util.trace.CalciteTrace;
-import org.apache.phoenix.calcite.rel.PhoenixAbstractAggregate;
import org.apache.phoenix.calcite.rel.PhoenixClientAggregate;
import org.apache.phoenix.calcite.rel.PhoenixClientProject;
import org.apache.phoenix.calcite.rel.PhoenixClientSort;
import org.apache.phoenix.calcite.rel.PhoenixFilter;
-import org.apache.phoenix.calcite.rel.PhoenixAbstractProject;
import org.apache.phoenix.calcite.rel.PhoenixJoin;
+import org.apache.phoenix.calcite.rel.PhoenixLimit;
import org.apache.phoenix.calcite.rel.PhoenixRel;
-import org.apache.phoenix.calcite.rel.PhoenixAbstractSort;
import org.apache.phoenix.calcite.rel.PhoenixToEnumerableConverter;
import org.apache.phoenix.calcite.rel.PhoenixUnion;
@@ -40,6 +38,7 @@ public class PhoenixConverterRules {
public static final RelOptRule[] RULES = {
PhoenixToEnumerableConverterRule.INSTANCE,
PhoenixSortRule.INSTANCE,
+ PhoenixLimitRule.INSTANCE,
PhoenixFilterRule.INSTANCE,
PhoenixProjectRule.INSTANCE,
PhoenixAggregateRule.INSTANCE,
@@ -73,7 +72,7 @@ public class PhoenixConverterRules {
/**
* Rule to convert a {@link org.apache.calcite.rel.core.Sort} to a
- * {@link PhoenixAbstractSort}.
+ * {@link PhoenixClientSort}.
*/
private static class PhoenixSortRule extends PhoenixConverterRule {
private static Predicate<LogicalSort> NON_EMPTY_COLLATION = new Predicate<LogicalSort>() {
@@ -102,6 +101,35 @@ public class PhoenixConverterRules {
}
/**
+ * Rule to convert a {@link org.apache.calcite.rel.core.Sort} to a
+ * {@link PhoenixLimit}.
+ */
+ private static class PhoenixLimitRule extends PhoenixConverterRule {
+ private static Predicate<LogicalSort> EMPTY_COLLATION = new Predicate<LogicalSort>() {
+ @Override
+ public boolean apply(LogicalSort input) {
+ return input.getCollation().getFieldCollations().isEmpty();
+ }
+ };
+
+ public static final PhoenixLimitRule INSTANCE = new PhoenixLimitRule();
+
+ private PhoenixLimitRule() {
+ super(LogicalSort.class, EMPTY_COLLATION, Convention.NONE, PhoenixRel.CONVENTION,
+ "PhoenixLimitRule");
+ }
+
+ public RelNode convert(RelNode rel) {
+ final LogicalSort sort = (LogicalSort) rel;
+ final RelTraitSet traitSet =
+ sort.getTraitSet().replace(out);
+ return new PhoenixLimit(rel.getCluster(), traitSet,
+ convert(sort.getInput(), sort.getInput().getTraitSet().replace(out)),
+ sort.getCollation(), sort.offset, sort.fetch);
+ }
+ }
+
+ /**
* Rule to convert a {@link org.apache.calcite.rel.logical.LogicalFilter} to a
* {@link PhoenixFilter}.
*/
@@ -126,7 +154,7 @@ public class PhoenixConverterRules {
/**
* Rule to convert a {@link org.apache.calcite.rel.logical.LogicalProject}
- * to a {@link PhoenixAbstractProject}.
+ * to a {@link PhoenixClientProject}.
*/
private static class PhoenixProjectRule extends PhoenixConverterRule {
private static final PhoenixProjectRule INSTANCE = new PhoenixProjectRule();
@@ -147,7 +175,7 @@ public class PhoenixConverterRules {
/**
* Rule to convert a {@link org.apache.calcite.rel.logical.LogicalAggregate}
- * to an {@link PhoenixAbstractAggregate}.
+ * to an {@link PhoenixClientAggregate}.
*/
private static class PhoenixAggregateRule extends PhoenixConverterRule {
public static final RelOptRule INSTANCE = new PhoenixAggregateRule();
@@ -193,8 +221,8 @@ public class PhoenixConverterRules {
}
/**
- * Rule to convert a {@link org.apache.calcite.rel.core.Sort} to a
- * {@link PhoenixAbstractSort}.
+ * Rule to convert a {@link org.apache.calcite.rel.core.Join} to a
+ * {@link PhoenixJoin}.
*/
private static class PhoenixJoinRule extends PhoenixConverterRule {
public static final PhoenixJoinRule INSTANCE = new PhoenixJoinRule();
@@ -376,6 +404,8 @@ public class PhoenixConverterRules {
@Override public RelNode convert(RelNode rel) {
RelTraitSet newTraitSet = rel.getTraitSet().replace(getOutConvention());
+ // TODO Is there a better place to do this?
+ rel.getCluster().setMetadataProvider(PhoenixRel.METADATA_PROVIDER);
return new PhoenixToEnumerableConverter(rel.getCluster(), newTraitSet, rel);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java
index dd0f119..9a992b3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java
@@ -4,7 +4,7 @@ import com.google.common.base.Predicate;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.rel.core.Filter;
+import org.apache.phoenix.calcite.rel.PhoenixFilter;
import org.apache.phoenix.calcite.rel.PhoenixTableScan;
public class PhoenixFilterScanMergeRule extends RelOptRule {
@@ -22,17 +22,17 @@ public class PhoenixFilterScanMergeRule extends RelOptRule {
private PhoenixFilterScanMergeRule() {
super(
- operand(Filter.class,
+ operand(PhoenixFilter.class,
operand(PhoenixTableScan.class, null, NO_FILTER, any())));
}
@Override
public void onMatch(RelOptRuleCall call) {
- Filter filter = call.rel(0);
+ PhoenixFilter filter = call.rel(0);
PhoenixTableScan scan = call.rel(1);
assert scan.filter == null : "predicate should have ensured no filter";
call.transformTo(new PhoenixTableScan(scan.getCluster(),
scan.getTraitSet(), scan.getTable(),
- filter.getCondition()));
+ filter.getCondition(), scan.statelessFetch));
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2368ea6d/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index cc2a244..14c47c4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -117,11 +117,6 @@ public class HashJoinPlan extends DelegateQueryPlan {
public SubPlan[] getSubPlans() {
return this.subPlans;
}
-
- @Override
- public Integer getLimit() {
- return this.joinInfo == null ? null : this.joinInfo.getLimit();
- }
@Override
public ResultIterator iterator() throws SQLException {