You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/12/08 01:38:40 UTC
[1/2] calcite git commit: [CALCITE-968] Stream-to-relation and
stream-to-stream joins (Milinda Pathirage)
Repository: calcite
Updated Branches:
refs/heads/master 9c86556ff -> 937fc461a
[CALCITE-968] Stream-to-relation and stream-to-stream joins (Milinda Pathirage)
Rule to transform Delta(Scan(constant-table)) to Empty;
fix NullPointerException in PruneEmptyRules.
Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/e9d50602
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/e9d50602
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/e9d50602
Branch: refs/heads/master
Commit: e9d506021252e1da6c09cebad3f747cd0e627d90
Parents: 9c86556
Author: Milinda Pathirage <mi...@gmail.com>
Authored: Fri Nov 20 19:03:21 2015 -0500
Committer: Julian Hyde <jh...@apache.org>
Committed: Mon Dec 7 14:22:02 2015 -0800
----------------------------------------------------------------------
.../calcite/rel/rules/PruneEmptyRules.java | 3 +-
.../apache/calcite/rel/stream/StreamRules.java | 75 ++++++++++++-
.../apache/calcite/runtime/CalciteResource.java | 3 +
.../calcite/sql/validate/SqlValidatorImpl.java | 45 ++++++--
.../calcite/runtime/CalciteResource.properties | 1 +
.../apache/calcite/sql/test/SqlAdvisorTest.java | 4 +-
.../apache/calcite/test/MockCatalogReader.java | 16 +++
.../apache/calcite/test/SqlValidatorTest.java | 15 +++
.../org/apache/calcite/test/StreamTest.java | 108 +++++++++++++++++++
9 files changed, 260 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/main/java/org/apache/calcite/rel/rules/PruneEmptyRules.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/PruneEmptyRules.java b/core/src/main/java/org/apache/calcite/rel/rules/PruneEmptyRules.java
index 2f4cada..3fccdfa 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/PruneEmptyRules.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/PruneEmptyRules.java
@@ -72,7 +72,8 @@ public abstract class PruneEmptyRules {
public void onMatch(RelOptRuleCall call) {
LogicalUnion union = call.rel(0);
final List<RelNode> childRels = call.getChildRels(union);
- final List<RelNode> newChildRels = new ArrayList<RelNode>();
+ assert childRels != null;
+ final List<RelNode> newChildRels = new ArrayList<>();
for (RelNode childRel : childRels) {
if (!isEmpty(childRel)) {
newChildRels.add(childRel);
http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
index 28b9972..4e64dc5 100644
--- a/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
+++ b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
@@ -24,16 +24,19 @@ import org.apache.calcite.prepare.RelOptTableImpl;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.core.Union;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.schema.StreamableTable;
import org.apache.calcite.schema.Table;
import org.apache.calcite.util.Util;
@@ -56,7 +59,9 @@ public class StreamRules {
new DeltaAggregateTransposeRule(),
new DeltaSortTransposeRule(),
new DeltaUnionTransposeRule(),
- new DeltaTableScanRule());
+ new DeltaJoinTransposeRule(),
+ new DeltaTableScanRule(),
+ new DeltaTableScanToEmptyRule());
/** Planner rule that pushes a {@link Delta} through a {@link Project}. */
public static class DeltaProjectTransposeRule extends RelOptRule {
@@ -193,6 +198,74 @@ public class StreamRules {
}
}
}
+
+ /**
+ * Planner rule that converts {@link Delta} over a {@link TableScan} of
+ * a table other than {@link org.apache.calcite.schema.StreamableTable} to Empty.
+ */
+ public static class DeltaTableScanToEmptyRule extends RelOptRule {
+ private DeltaTableScanToEmptyRule() {
+ super(
+ operand(Delta.class,
+ operand(TableScan.class, none())));
+ }
+
+ @Override public void onMatch(RelOptRuleCall call) {
+ final Delta delta = call.rel(0);
+ final TableScan scan = call.rel(1);
+ final RelOptCluster cluster = delta.getCluster();
+ final RelOptTable relOptTable = scan.getTable();
+ final StreamableTable streamableTable =
+ relOptTable.unwrap(StreamableTable.class);
+ if (streamableTable == null) {
+ call.transformTo(LogicalValues.createEmpty(cluster, delta.getRowType()));
+ }
+ }
+ }
+
+
+ /**
+ * Planner rule that pushes a {@link Delta} through a {@link Join}.
+ *
+ * Product rule [1] is applied to implement the transpose:
+ * stream(x join y)" = "x join stream(y) union all stream(x) join y
+ *
+ * [1] https://en.wikipedia.org/wiki/Product_rule
+ */
+ public static class DeltaJoinTransposeRule extends RelOptRule {
+
+ public DeltaJoinTransposeRule() {
+ super(
+ operand(Delta.class,
+ operand(Join.class, any())));
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final Delta delta = call.rel(0);
+ final Join join = call.rel(1);
+ final RelOptCluster cluster = delta.getCluster();
+ RelNode left = join.getLeft();
+ RelNode right = join.getRight();
+
+ final LogicalDelta rightWithDelta = LogicalDelta.create(right);
+ final LogicalJoin joinL = LogicalJoin.create(left, rightWithDelta, join.getCondition(),
+ join.getJoinType(), join.getVariablesStopped(), join.isSemiJoinDone(),
+ ImmutableList.copyOf(join.getSystemFieldList()));
+
+ final LogicalDelta leftWithDelta = LogicalDelta.create(left);
+ final LogicalJoin joinR = LogicalJoin.create(leftWithDelta, right, join.getCondition(),
+ join.getJoinType(), join.getVariablesStopped(), join.isSemiJoinDone(),
+ ImmutableList.copyOf(join.getSystemFieldList()));
+
+ List<RelNode> inputsToUnion = Lists.newArrayList();
+ inputsToUnion.add(joinL);
+ inputsToUnion.add(joinR);
+
+ final LogicalUnion newNode = LogicalUnion.create(inputsToUnion, true);
+ call.transformTo(newNode);
+ }
+ }
}
// End StreamRules.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
index aa701d9..c06f3c3 100644
--- a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
+++ b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
@@ -603,6 +603,9 @@ public interface CalciteResource {
@BaseMessage("Table ''{0}'' not found")
ExInst<CalciteException> tableNotFound(String tableName);
+
+ @BaseMessage("Cannot stream results of a query with no streaming inputs: ''{0}''. At least one input should be convertable to a stream.")
+ ExInst<SqlValidatorException> cannotStreamResultsForNonStreamingInputs(String inputs);
}
// End CalciteResource.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
index 0390c07..d430575 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
@@ -82,6 +82,7 @@ import org.apache.calcite.util.trace.CalciteTrace;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -3016,17 +3017,47 @@ public class SqlValidatorImpl implements SqlValidatorWithHints {
public boolean validateModality(SqlSelect select, SqlModality modality,
boolean fail) {
final SelectScope scope = getRawSelectScope(select);
- for (Pair<String, SqlValidatorNamespace> namespace : scope.children) {
- if (!namespace.right.supportsModality(modality)) {
- switch (modality) {
- case STREAM:
+
+ switch (modality) {
+ case STREAM:
+ if (scope.children.size() == 1) {
+ for (Pair<String, SqlValidatorNamespace> namespace : scope.children) {
+ if (!namespace.right.supportsModality(modality)) {
+ if (fail) {
+ throw newValidationError(namespace.right.getNode(),
+ Static.RESOURCE.cannotConvertToStream(namespace.left));
+ } else {
+ return false;
+ }
+ }
+ }
+ } else {
+ boolean atLeastOneSupportsModality = false;
+ for (Pair<String, SqlValidatorNamespace> namespace : scope.children) {
+ if (namespace.right.supportsModality(modality)) {
+ atLeastOneSupportsModality = true;
+ }
+ }
+
+ if (!atLeastOneSupportsModality) {
if (fail) {
- throw newValidationError(namespace.right.getNode(),
- Static.RESOURCE.cannotConvertToStream(namespace.left));
+ List<String> inputList = new ArrayList<String>();
+ for (Pair<String, SqlValidatorNamespace> namespace : scope.children) {
+ inputList.add(namespace.left);
+ }
+ String inputs = Joiner.on(", ").join(inputList);
+
+ throw newValidationError(select,
+ Static.RESOURCE.cannotStreamResultsForNonStreamingInputs(inputs));
} else {
return false;
}
- default:
+ }
+ }
+ break;
+ default:
+ for (Pair<String, SqlValidatorNamespace> namespace : scope.children) {
+ if (!namespace.right.supportsModality(modality)) {
if (fail) {
throw newValidationError(namespace.right.getNode(),
Static.RESOURCE.cannotConvertToRelation(namespace.left));
http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
index 9787ba9..2eb4947 100644
--- a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
+++ b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
@@ -197,4 +197,5 @@ ModifiableViewMustBeBasedOnSingleTable=Modifiable view must be based on a single
MoreThanOneMappedColumn=View is not modifiable. More than one expression maps to column ''{0}'' of base table ''{1}''
NoValueSuppliedForViewColumn=View is not modifiable. No value is supplied for NOT NULL column ''{0}'' of base table ''{1}''
TableNotFound=Table ''{0}'' not found
+CannotStreamResultsForNonStreamingInputs=Cannot stream results of a query with no streaming inputs: ''{0}''. At least one input should be convertable to a stream.
# End CalciteResource.properties
http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java b/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java
index 1a71ec4..50019e1 100644
--- a/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java
+++ b/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java
@@ -72,7 +72,9 @@ public class SqlAdvisorTest extends SqlValidatorTestCase {
"TABLE(CATALOG.SALES.BONUS)",
"TABLE(CATALOG.SALES.ORDERS)",
"TABLE(CATALOG.SALES.SALGRADE)",
- "TABLE(CATALOG.SALES.SHIPMENTS)");
+ "TABLE(CATALOG.SALES.SHIPMENTS)",
+ "TABLE(CATALOG.SALES.PRODUCTS)",
+ "TABLE(CATALOG.SALES.SUPPLIERS)");
private static final List<String> SCHEMAS =
Arrays.asList(
http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java b/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
index 71b3115..b76b908 100644
--- a/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
+++ b/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
@@ -250,6 +250,22 @@ public class MockCatalogReader implements Prepare.CatalogReader {
shipmentsStream.addColumn("ORDERID", intType);
registerTable(shipmentsStream);
+ // Register "PRODUCTS" table.
+ MockTable productsTable = MockTable.create(this, salesSchema, "PRODUCTS",
+ false);
+ productsTable.addColumn("PRODUCTID", intType);
+ productsTable.addColumn("NAME", varchar20Type);
+ productsTable.addColumn("SUPPLIERID", intType);
+ registerTable(productsTable);
+
+ // Register "SUPPLIERS" table.
+ MockTable suppliersTable = MockTable.create(this, salesSchema, "SUPPLIERS",
+ false);
+ suppliersTable.addColumn("SUPPLIERID", intType);
+ suppliersTable.addColumn("NAME", varchar20Type);
+ suppliersTable.addColumn("CITY", intType);
+ registerTable(suppliersTable);
+
// Register "EMP_20" view.
// Same columns as "EMP",
// but "DEPTNO" not visible and set to 20 by default
http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
index 026f41e..7ace229 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
@@ -132,6 +132,12 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
return "Cannot convert stream '" + table + "' to relation";
}
+ private static String cannotStreamResultsForNonStreamingInputs(String inputs) {
+ return "Cannot stream results of a query with no streaming inputs: '"
+ + inputs
+ + "'. At least one input should be convertable to a stream.";
+ }
+
@Test public void testMultipleSameAsPass() {
check("select 1 as again,2 as \"again\", 3 as AGAiN from (values (true))");
}
@@ -7415,6 +7421,15 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
+ "order by floor(rowtime to hour), rowtime desc").ok();
}
+ @Test public void testStreamJoin() {
+ sql("select stream \n"
+ + "orders.rowtime as rowtime, orders.orderId as orderId, products.supplierId as supplierId \n"
+ + "from orders join products on orders.productId = products.productId").ok();
+ sql("^select stream *\n"
+ + "from products join suppliers on products.supplierId = suppliers.supplierId^")
+ .fails(cannotStreamResultsForNonStreamingInputs("PRODUCTS, SUPPLIERS"));
+ }
+
@Test public void testNew() {
// (To debug individual statements, paste them into this method.)
// 1 2 3 4 5 6
http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/test/java/org/apache/calcite/test/StreamTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/StreamTest.java b/core/src/test/java/org/apache/calcite/test/StreamTest.java
index 269a2e5..90d67dd 100644
--- a/core/src/test/java/org/apache/calcite/test/StreamTest.java
+++ b/core/src/test/java/org/apache/calcite/test/StreamTest.java
@@ -59,6 +59,7 @@ import static org.junit.Assert.assertThat;
public class StreamTest {
public static final String STREAM_SCHEMA_NAME = "STREAMS";
public static final String INFINITE_STREAM_SCHEMA_NAME = "INFINITE_STREAMS";
+ public static final String STREAMJOINS_SCHEMA_NAME = "STREAMJOINS";
private static String schemaFor(String name, Class<? extends TableFactory> clazz) {
return " {\n"
@@ -74,6 +75,27 @@ public class StreamTest {
+ " }";
}
+ private static final String STREAM_JOINS_MODEL = "{\n"
+ + " version: '1.0',\n"
+ + " defaultSchema: 'STREAMJOINS',\n"
+ + " schemas: [\n"
+ + " {\n"
+ + " name: 'STREAMJOINS',\n"
+ + " tables: [ {\n"
+ + " type: 'custom',\n"
+ + " name: 'ORDERS',\n"
+ + " stream: {\n"
+ + " stream: true\n"
+ + " },\n"
+ + " factory: '" + OrdersStreamTableFactory.class.getName() + "'\n"
+ + " }, \n"
+ + " {\n"
+ + " type: 'custom',\n"
+ + " name: 'PRODUCTS',\n"
+ + " factory: '" + ProductsTableFactory.class.getName() + "'\n"
+ + " }]\n"
+ + " }]}";
+
public static final String STREAM_MODEL = "{\n"
+ " version: '1.0',\n"
+ " defaultSchema: 'foodmart',\n"
@@ -212,6 +234,32 @@ public class StreamTest {
.returnsCount(100);
}
+ @Test public void testStreamToRelaitonJoin() {
+ CalciteAssert.model(STREAM_JOINS_MODEL)
+ .withDefaultSchema(STREAMJOINS_SCHEMA_NAME)
+ .query("select stream "
+ + "orders.rowtime as rowtime, orders.id as orderId, products.supplier as supplierId "
+ + "from orders join products on orders.product = products.id")
+ .convertContains(
+ "LogicalDelta\n"
+ + " LogicalProject(ROWTIME=[$0], ORDERID=[$1], SUPPLIERID=[$5])\n"
+ + " LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3], ID0=[$5], SUPPLIER=[$6])\n"
+ + " LogicalJoin(condition=[=($4, $5)], joinType=[inner])\n"
+ + " LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3], PRODUCT4=[CAST($2):VARCHAR(32) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL])\n"
+ + " LogicalTableScan(table=[[STREAMJOINS, ORDERS]])\n"
+ + " LogicalTableScan(table=[[STREAMJOINS, PRODUCTS]])\n")
+ .explainContains(""
+ + "EnumerableJoin(condition=[=($4, $5)], joinType=[inner])\n"
+ + " EnumerableCalc(expr#0..3=[{inputs}], expr#4=[CAST($t2):VARCHAR(32) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL], proj#0..4=[{exprs}])\n"
+ + " EnumerableInterpreter\n"
+ + " BindableTableScan(table=[[]])\n"
+ + " EnumerableInterpreter\n"
+ + " BindableTableScan(table=[[STREAMJOINS, PRODUCTS]])")
+ .returns(startsWith("ROWTIME=2015-02-15 10:15:00; ORDERID=1; SUPPLIERID=1",
+ "ROWTIME=2015-02-15 10:24:15; ORDERID=2; SUPPLIERID=0",
+ "ROWTIME=2015-02-15 10:24:45; ORDERID=3; SUPPLIERID=1"));
+ }
+
private Function<ResultSet, Void> startsWith(String... rows) {
final ImmutableList<String> rowList = ImmutableList.copyOf(rows);
return new Function<ResultSet, Void>() {
@@ -362,6 +410,66 @@ public class StreamTest {
return this;
}
}
+
+ /**
+ * Mocks simple relation to use for stream joining test.
+ */
+ public static class ProductsTableFactory implements TableFactory<Table> {
+
+ public ProductsTableFactory(){}
+
+ @Override
+ public Table create(SchemaPlus schema, String name, Map<String, Object> operand,
+ RelDataType rowType) {
+ final ImmutableList<Object[]> rows = ImmutableList.of(
+ new Object[]{"paint", 1},
+ new Object[]{"paper", 0},
+ new Object[]{"brush", 1}
+ );
+ return new ProductsTable(rows);
+ }
+ }
+
+ /**
+ * Table representing the PRODUCTS relation
+ */
+ public static class ProductsTable implements ScannableTable {
+
+ private final ImmutableList<Object[]> rows;
+
+ public ProductsTable(ImmutableList<Object[]> rows) {
+ this.rows = rows;
+ }
+
+ private final RelProtoDataType protoRowType = new RelProtoDataType() {
+ public RelDataType apply(RelDataTypeFactory a0) {
+ return a0.builder()
+ .add("ID", SqlTypeName.VARCHAR, 32)
+ .add("SUPPLIER", SqlTypeName.INTEGER)
+ .build();
+ }
+ };
+
+ @Override
+ public Enumerable<Object[]> scan(DataContext root) {
+ return Linq4j.asEnumerable(rows);
+ }
+
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+ return protoRowType.apply(typeFactory);
+ }
+
+ @Override
+ public Statistic getStatistic() {
+ return Statistics.of(200d, ImmutableList.<ImmutableBitSet>of());
+ }
+
+ @Override
+ public Schema.TableType getJdbcTableType() {
+ return Schema.TableType.TABLE;
+ }
+ }
}
// End StreamTest.java
[2/2] calcite git commit: Fix up [CALCITE-968]
Posted by jh...@apache.org.
Fix up [CALCITE-968]
Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/937fc461
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/937fc461
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/937fc461
Branch: refs/heads/master
Commit: 937fc461a375818921e8147c2d23e26b7e8dfca0
Parents: e9d5060
Author: Julian Hyde <jh...@apache.org>
Authored: Sat Dec 5 14:36:02 2015 -0800
Committer: Julian Hyde <jh...@apache.org>
Committed: Mon Dec 7 14:23:57 2015 -0800
----------------------------------------------------------------------
.../apache/calcite/rel/stream/StreamRules.java | 26 +++---
.../apache/calcite/runtime/CalciteResource.java | 2 +-
.../calcite/sql/validate/SqlValidatorImpl.java | 6 +-
.../calcite/runtime/CalciteResource.properties | 2 +-
.../apache/calcite/test/MockCatalogReader.java | 4 +-
.../apache/calcite/test/SqlValidatorTest.java | 2 +-
.../org/apache/calcite/test/StreamTest.java | 84 +++++++++-----------
7 files changed, 60 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
index 4e64dc5..48e7bbf 100644
--- a/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
+++ b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
@@ -29,6 +29,7 @@ import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.core.Values;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
@@ -36,9 +37,9 @@ import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rel.logical.LogicalUnion;
-import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.schema.StreamableTable;
import org.apache.calcite.schema.Table;
+import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.util.Util;
import com.google.common.collect.ImmutableList;
@@ -201,7 +202,8 @@ public class StreamRules {
/**
* Planner rule that converts {@link Delta} over a {@link TableScan} of
- * a table other than {@link org.apache.calcite.schema.StreamableTable} to Empty.
+ * a table other than {@link org.apache.calcite.schema.StreamableTable} to
+ * an empty {@link Values}.
*/
public static class DeltaTableScanToEmptyRule extends RelOptRule {
private DeltaTableScanToEmptyRule() {
@@ -213,24 +215,25 @@ public class StreamRules {
@Override public void onMatch(RelOptRuleCall call) {
final Delta delta = call.rel(0);
final TableScan scan = call.rel(1);
- final RelOptCluster cluster = delta.getCluster();
final RelOptTable relOptTable = scan.getTable();
final StreamableTable streamableTable =
relOptTable.unwrap(StreamableTable.class);
+ final RelBuilder builder = call.builder();
if (streamableTable == null) {
- call.transformTo(LogicalValues.createEmpty(cluster, delta.getRowType()));
+ call.transformTo(builder.values(delta.getRowType()).build());
}
}
}
-
/**
* Planner rule that pushes a {@link Delta} through a {@link Join}.
*
- * Product rule [1] is applied to implement the transpose:
- * stream(x join y)" = "x join stream(y) union all stream(x) join y
+ * <p>We apply something analogous to the
+ * <a href="https://en.wikipedia.org/wiki/Product_rule">product rule of
+ * differential calculus</a> to implement the transpose:
*
- * [1] https://en.wikipedia.org/wiki/Product_rule
+ * <blockquote><code>stream(x join y) →
+ * x join stream(y) union all stream(x) join y</code></blockquote>
*/
public static class DeltaJoinTransposeRule extends RelOptRule {
@@ -240,13 +243,12 @@ public class StreamRules {
operand(Join.class, any())));
}
- @Override
public void onMatch(RelOptRuleCall call) {
final Delta delta = call.rel(0);
+ Util.discard(delta);
final Join join = call.rel(1);
- final RelOptCluster cluster = delta.getCluster();
- RelNode left = join.getLeft();
- RelNode right = join.getRight();
+ final RelNode left = join.getLeft();
+ final RelNode right = join.getRight();
final LogicalDelta rightWithDelta = LogicalDelta.create(right);
final LogicalJoin joinL = LogicalJoin.create(left, rightWithDelta, join.getCondition(),
http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
index c06f3c3..6eabafd 100644
--- a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
+++ b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
@@ -604,7 +604,7 @@ public interface CalciteResource {
@BaseMessage("Table ''{0}'' not found")
ExInst<CalciteException> tableNotFound(String tableName);
- @BaseMessage("Cannot stream results of a query with no streaming inputs: ''{0}''. At least one input should be convertable to a stream.")
+ @BaseMessage("Cannot stream results of a query with no streaming inputs: ''{0}''. At least one input should be convertible to a stream")
ExInst<SqlValidatorException> cannotStreamResultsForNonStreamingInputs(String inputs);
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
index d430575..1218d5a 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
@@ -3032,14 +3032,14 @@ public class SqlValidatorImpl implements SqlValidatorWithHints {
}
}
} else {
- boolean atLeastOneSupportsModality = false;
+ int supportsModalityCount = 0;
for (Pair<String, SqlValidatorNamespace> namespace : scope.children) {
if (namespace.right.supportsModality(modality)) {
- atLeastOneSupportsModality = true;
+ ++supportsModalityCount;
}
}
- if (!atLeastOneSupportsModality) {
+ if (supportsModalityCount == 0) {
if (fail) {
List<String> inputList = new ArrayList<String>();
for (Pair<String, SqlValidatorNamespace> namespace : scope.children) {
http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
index 2eb4947..7e97a51 100644
--- a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
+++ b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
@@ -197,5 +197,5 @@ ModifiableViewMustBeBasedOnSingleTable=Modifiable view must be based on a single
MoreThanOneMappedColumn=View is not modifiable. More than one expression maps to column ''{0}'' of base table ''{1}''
NoValueSuppliedForViewColumn=View is not modifiable. No value is supplied for NOT NULL column ''{0}'' of base table ''{1}''
TableNotFound=Table ''{0}'' not found
-CannotStreamResultsForNonStreamingInputs=Cannot stream results of a query with no streaming inputs: ''{0}''. At least one input should be convertable to a stream.
+CannotStreamResultsForNonStreamingInputs=Cannot stream results of a query with no streaming inputs: ''{0}''. At least one input should be convertible to a stream
# End CalciteResource.properties
http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java b/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
index b76b908..529df04 100644
--- a/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
+++ b/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
@@ -252,7 +252,7 @@ public class MockCatalogReader implements Prepare.CatalogReader {
// Register "PRODUCTS" table.
MockTable productsTable = MockTable.create(this, salesSchema, "PRODUCTS",
- false);
+ false, 200D);
productsTable.addColumn("PRODUCTID", intType);
productsTable.addColumn("NAME", varchar20Type);
productsTable.addColumn("SUPPLIERID", intType);
@@ -260,7 +260,7 @@ public class MockCatalogReader implements Prepare.CatalogReader {
// Register "SUPPLIERS" table.
MockTable suppliersTable = MockTable.create(this, salesSchema, "SUPPLIERS",
- false);
+ false, 10D);
suppliersTable.addColumn("SUPPLIERID", intType);
suppliersTable.addColumn("NAME", varchar20Type);
suppliersTable.addColumn("CITY", intType);
http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
index 7ace229..64d1c56 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
@@ -135,7 +135,7 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
private static String cannotStreamResultsForNonStreamingInputs(String inputs) {
return "Cannot stream results of a query with no streaming inputs: '"
+ inputs
- + "'. At least one input should be convertable to a stream.";
+ + "'. At least one input should be convertible to a stream";
}
@Test public void testMultipleSameAsPass() {
http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/test/java/org/apache/calcite/test/StreamTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/StreamTest.java b/core/src/test/java/org/apache/calcite/test/StreamTest.java
index 90d67dd..649db3d 100644
--- a/core/src/test/java/org/apache/calcite/test/StreamTest.java
+++ b/core/src/test/java/org/apache/calcite/test/StreamTest.java
@@ -59,7 +59,7 @@ import static org.junit.Assert.assertThat;
public class StreamTest {
public static final String STREAM_SCHEMA_NAME = "STREAMS";
public static final String INFINITE_STREAM_SCHEMA_NAME = "INFINITE_STREAMS";
- public static final String STREAMJOINS_SCHEMA_NAME = "STREAMJOINS";
+ public static final String STREAM_JOINS_SCHEMA_NAME = "STREAM_JOINS";
private static String schemaFor(String name, Class<? extends TableFactory> clazz) {
return " {\n"
@@ -77,10 +77,10 @@ public class StreamTest {
private static final String STREAM_JOINS_MODEL = "{\n"
+ " version: '1.0',\n"
- + " defaultSchema: 'STREAMJOINS',\n"
+ + " defaultSchema: 'STREAM_JOINS',\n"
+ " schemas: [\n"
+ " {\n"
- + " name: 'STREAMJOINS',\n"
+ + " name: 'STREAM_JOINS',\n"
+ " tables: [ {\n"
+ " type: 'custom',\n"
+ " name: 'ORDERS',\n"
@@ -234,30 +234,31 @@ public class StreamTest {
.returnsCount(100);
}
- @Test public void testStreamToRelaitonJoin() {
+ @Test public void testStreamToRelationJoin() {
CalciteAssert.model(STREAM_JOINS_MODEL)
- .withDefaultSchema(STREAMJOINS_SCHEMA_NAME)
+ .withDefaultSchema(STREAM_JOINS_SCHEMA_NAME)
.query("select stream "
+ "orders.rowtime as rowtime, orders.id as orderId, products.supplier as supplierId "
+ "from orders join products on orders.product = products.id")
- .convertContains(
- "LogicalDelta\n"
- + " LogicalProject(ROWTIME=[$0], ORDERID=[$1], SUPPLIERID=[$5])\n"
- + " LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3], ID0=[$5], SUPPLIER=[$6])\n"
- + " LogicalJoin(condition=[=($4, $5)], joinType=[inner])\n"
- + " LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3], PRODUCT4=[CAST($2):VARCHAR(32) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL])\n"
- + " LogicalTableScan(table=[[STREAMJOINS, ORDERS]])\n"
- + " LogicalTableScan(table=[[STREAMJOINS, PRODUCTS]])\n")
+ .convertContains("LogicalDelta\n"
+ + " LogicalProject(ROWTIME=[$0], ORDERID=[$1], SUPPLIERID=[$5])\n"
+ + " LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3], ID0=[$5], SUPPLIER=[$6])\n"
+ + " LogicalJoin(condition=[=($4, $5)], joinType=[inner])\n"
+ + " LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3], PRODUCT4=[CAST($2):VARCHAR(32) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL])\n"
+ + " LogicalTableScan(table=[[STREAM_JOINS, ORDERS]])\n"
+ + " LogicalTableScan(table=[[STREAM_JOINS, PRODUCTS]])\n")
.explainContains(""
- + "EnumerableJoin(condition=[=($4, $5)], joinType=[inner])\n"
+ + "EnumerableCalc(expr#0..6=[{inputs}], proj#0..1=[{exprs}], SUPPLIERID=[$t6])\n"
+ + " EnumerableJoin(condition=[=($4, $5)], joinType=[inner])\n"
+ " EnumerableCalc(expr#0..3=[{inputs}], expr#4=[CAST($t2):VARCHAR(32) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL], proj#0..4=[{exprs}])\n"
+ " EnumerableInterpreter\n"
+ " BindableTableScan(table=[[]])\n"
+ " EnumerableInterpreter\n"
- + " BindableTableScan(table=[[STREAMJOINS, PRODUCTS]])")
- .returns(startsWith("ROWTIME=2015-02-15 10:15:00; ORDERID=1; SUPPLIERID=1",
- "ROWTIME=2015-02-15 10:24:15; ORDERID=2; SUPPLIERID=0",
- "ROWTIME=2015-02-15 10:24:45; ORDERID=3; SUPPLIERID=1"));
+ + " BindableTableScan(table=[[STREAM_JOINS, PRODUCTS]])")
+ .returns(
+ startsWith("ROWTIME=2015-02-15 10:15:00; ORDERID=1; SUPPLIERID=1",
+ "ROWTIME=2015-02-15 10:24:15; ORDERID=2; SUPPLIERID=0",
+ "ROWTIME=2015-02-15 10:24:45; ORDERID=3; SUPPLIERID=1"));
}
private Function<ResultSet, Void> startsWith(String... rows) {
@@ -324,14 +325,14 @@ public class StreamTest {
public Table create(SchemaPlus schema, String name,
Map<String, Object> operand, RelDataType rowType) {
- final ImmutableList<Object[]> rows = ImmutableList.of(
- new Object[] {ts(10, 15, 0), 1, "paint", 10},
- new Object[] {ts(10, 24, 15), 2, "paper", 5},
- new Object[] {ts(10, 24, 45), 3, "brush", 12},
- new Object[] {ts(10, 58, 0), 4, "paint", 3},
- new Object[] {ts(11, 10, 0), 5, "paint", 3});
-
- return new OrdersTable(rows);
+ final Object[][] rows = {
+ {ts(10, 15, 0), 1, "paint", 10},
+ {ts(10, 24, 15), 2, "paper", 5},
+ {ts(10, 24, 45), 3, "brush", 12},
+ {ts(10, 58, 0), 4, "paint", 3},
+ {ts(11, 10, 0), 5, "paint", 3}
+ };
+ return new OrdersTable(ImmutableList.copyOf(rows));
}
private Object ts(int h, int m, int s) {
@@ -406,35 +407,30 @@ public class StreamTest {
});
}
- @Override public Table stream() {
+ public Table stream() {
return this;
}
}
/**
- * Mocks simple relation to use for stream joining test.
+ * Mocks a simple relation to use for stream joining test.
*/
public static class ProductsTableFactory implements TableFactory<Table> {
-
- public ProductsTableFactory(){}
-
- @Override
- public Table create(SchemaPlus schema, String name, Map<String, Object> operand,
- RelDataType rowType) {
- final ImmutableList<Object[]> rows = ImmutableList.of(
- new Object[]{"paint", 1},
- new Object[]{"paper", 0},
- new Object[]{"brush", 1}
- );
- return new ProductsTable(rows);
+ public Table create(SchemaPlus schema, String name,
+ Map<String, Object> operand, RelDataType rowType) {
+ final Object[][] rows = {
+ {"paint", 1},
+ {"paper", 0},
+ {"brush", 1}
+ };
+ return new ProductsTable(ImmutableList.copyOf(rows));
}
}
/**
- * Table representing the PRODUCTS relation
+ * Table representing the PRODUCTS relation.
*/
public static class ProductsTable implements ScannableTable {
-
private final ImmutableList<Object[]> rows;
public ProductsTable(ImmutableList<Object[]> rows) {
@@ -450,22 +446,18 @@ public class StreamTest {
}
};
- @Override
public Enumerable<Object[]> scan(DataContext root) {
return Linq4j.asEnumerable(rows);
}
- @Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
return protoRowType.apply(typeFactory);
}
- @Override
public Statistic getStatistic() {
return Statistics.of(200d, ImmutableList.<ImmutableBitSet>of());
}
- @Override
public Schema.TableType getJdbcTableType() {
return Schema.TableType.TABLE;
}