You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/12/08 01:38:40 UTC

[1/2] calcite git commit: [CALCITE-968] Stream-to-relation and stream-to-stream joins (Milinda Pathirage)

Repository: calcite
Updated Branches:
  refs/heads/master 9c86556ff -> 937fc461a


[CALCITE-968] Stream-to-relation and stream-to-stream joins (Milinda Pathirage)

Rule to transform Delta(Scan(constant-table)) to Empty;
fix NullPointerException in PruneEmptyRules.


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/e9d50602
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/e9d50602
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/e9d50602

Branch: refs/heads/master
Commit: e9d506021252e1da6c09cebad3f747cd0e627d90
Parents: 9c86556
Author: Milinda Pathirage <mi...@gmail.com>
Authored: Fri Nov 20 19:03:21 2015 -0500
Committer: Julian Hyde <jh...@apache.org>
Committed: Mon Dec 7 14:22:02 2015 -0800

----------------------------------------------------------------------
 .../calcite/rel/rules/PruneEmptyRules.java      |   3 +-
 .../apache/calcite/rel/stream/StreamRules.java  |  75 ++++++++++++-
 .../apache/calcite/runtime/CalciteResource.java |   3 +
 .../calcite/sql/validate/SqlValidatorImpl.java  |  45 ++++++--
 .../calcite/runtime/CalciteResource.properties  |   1 +
 .../apache/calcite/sql/test/SqlAdvisorTest.java |   4 +-
 .../apache/calcite/test/MockCatalogReader.java  |  16 +++
 .../apache/calcite/test/SqlValidatorTest.java   |  15 +++
 .../org/apache/calcite/test/StreamTest.java     | 108 +++++++++++++++++++
 9 files changed, 260 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/main/java/org/apache/calcite/rel/rules/PruneEmptyRules.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/PruneEmptyRules.java b/core/src/main/java/org/apache/calcite/rel/rules/PruneEmptyRules.java
index 2f4cada..3fccdfa 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/PruneEmptyRules.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/PruneEmptyRules.java
@@ -72,7 +72,8 @@ public abstract class PruneEmptyRules {
         public void onMatch(RelOptRuleCall call) {
           LogicalUnion union = call.rel(0);
           final List<RelNode> childRels = call.getChildRels(union);
-          final List<RelNode> newChildRels = new ArrayList<RelNode>();
+          assert childRels != null;
+          final List<RelNode> newChildRels = new ArrayList<>();
           for (RelNode childRel : childRels) {
             if (!isEmpty(childRel)) {
               newChildRels.add(childRel);

http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
index 28b9972..4e64dc5 100644
--- a/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
+++ b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
@@ -24,16 +24,19 @@ import org.apache.calcite.prepare.RelOptTableImpl;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.core.Union;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.logical.LogicalSort;
 import org.apache.calcite.rel.logical.LogicalTableScan;
 import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rel.logical.LogicalValues;
 import org.apache.calcite.schema.StreamableTable;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.util.Util;
@@ -56,7 +59,9 @@ public class StreamRules {
           new DeltaAggregateTransposeRule(),
           new DeltaSortTransposeRule(),
           new DeltaUnionTransposeRule(),
-          new DeltaTableScanRule());
+          new DeltaJoinTransposeRule(),
+          new DeltaTableScanRule(),
+          new DeltaTableScanToEmptyRule());
 
   /** Planner rule that pushes a {@link Delta} through a {@link Project}. */
   public static class DeltaProjectTransposeRule extends RelOptRule {
@@ -193,6 +198,74 @@ public class StreamRules {
       }
     }
   }
+
+  /**
+   * Planner rule that converts {@link Delta} over a {@link TableScan} of
+   * a table other than {@link org.apache.calcite.schema.StreamableTable} to Empty.
+   */
+  public static class DeltaTableScanToEmptyRule extends RelOptRule {
+    private DeltaTableScanToEmptyRule() {
+      super(
+          operand(Delta.class,
+              operand(TableScan.class, none())));
+    }
+
+    @Override public void onMatch(RelOptRuleCall call) {
+      final Delta delta = call.rel(0);
+      final TableScan scan = call.rel(1);
+      final RelOptCluster cluster = delta.getCluster();
+      final RelOptTable relOptTable = scan.getTable();
+      final StreamableTable streamableTable =
+          relOptTable.unwrap(StreamableTable.class);
+      if (streamableTable == null) {
+        call.transformTo(LogicalValues.createEmpty(cluster, delta.getRowType()));
+      }
+    }
+  }
+
+
+  /**
+   * Planner rule that pushes a {@link Delta} through a {@link Join}.
+   *
+   * Product rule [1] is applied to implement the transpose:
+   * stream(x join y)" = "x join stream(y) union all stream(x) join y
+   *
+   * [1] https://en.wikipedia.org/wiki/Product_rule
+   */
+  public static class DeltaJoinTransposeRule extends RelOptRule {
+
+    public DeltaJoinTransposeRule() {
+      super(
+          operand(Delta.class,
+              operand(Join.class, any())));
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+      final Delta delta = call.rel(0);
+      final Join join = call.rel(1);
+      final RelOptCluster cluster = delta.getCluster();
+      RelNode left = join.getLeft();
+      RelNode right = join.getRight();
+
+      final LogicalDelta rightWithDelta = LogicalDelta.create(right);
+      final LogicalJoin joinL = LogicalJoin.create(left, rightWithDelta, join.getCondition(),
+          join.getJoinType(), join.getVariablesStopped(), join.isSemiJoinDone(),
+          ImmutableList.copyOf(join.getSystemFieldList()));
+
+      final LogicalDelta leftWithDelta = LogicalDelta.create(left);
+      final LogicalJoin joinR = LogicalJoin.create(leftWithDelta, right, join.getCondition(),
+          join.getJoinType(), join.getVariablesStopped(), join.isSemiJoinDone(),
+          ImmutableList.copyOf(join.getSystemFieldList()));
+
+      List<RelNode> inputsToUnion = Lists.newArrayList();
+      inputsToUnion.add(joinL);
+      inputsToUnion.add(joinR);
+
+      final LogicalUnion newNode = LogicalUnion.create(inputsToUnion, true);
+      call.transformTo(newNode);
+    }
+  }
 }
 
 // End StreamRules.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
index aa701d9..c06f3c3 100644
--- a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
+++ b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
@@ -603,6 +603,9 @@ public interface CalciteResource {
 
   @BaseMessage("Table ''{0}'' not found")
   ExInst<CalciteException> tableNotFound(String tableName);
+
+  @BaseMessage("Cannot stream results of a query with no streaming inputs: ''{0}''. At least one input should be convertable to a stream.")
+  ExInst<SqlValidatorException> cannotStreamResultsForNonStreamingInputs(String inputs);
 }
 
 // End CalciteResource.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
index 0390c07..d430575 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
@@ -82,6 +82,7 @@ import org.apache.calcite.util.trace.CalciteTrace;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -3016,17 +3017,47 @@ public class SqlValidatorImpl implements SqlValidatorWithHints {
   public boolean validateModality(SqlSelect select, SqlModality modality,
       boolean fail) {
     final SelectScope scope = getRawSelectScope(select);
-    for (Pair<String, SqlValidatorNamespace> namespace : scope.children) {
-      if (!namespace.right.supportsModality(modality)) {
-        switch (modality) {
-        case STREAM:
+
+    switch (modality) {
+    case STREAM:
+      if (scope.children.size() == 1) {
+        for (Pair<String, SqlValidatorNamespace> namespace : scope.children) {
+          if (!namespace.right.supportsModality(modality)) {
+            if (fail) {
+              throw newValidationError(namespace.right.getNode(),
+                  Static.RESOURCE.cannotConvertToStream(namespace.left));
+            } else {
+              return false;
+            }
+          }
+        }
+      } else {
+        boolean atLeastOneSupportsModality = false;
+        for (Pair<String, SqlValidatorNamespace> namespace : scope.children) {
+          if (namespace.right.supportsModality(modality)) {
+            atLeastOneSupportsModality = true;
+          }
+        }
+
+        if (!atLeastOneSupportsModality) {
           if (fail) {
-            throw newValidationError(namespace.right.getNode(),
-                Static.RESOURCE.cannotConvertToStream(namespace.left));
+            List<String> inputList = new ArrayList<String>();
+            for (Pair<String, SqlValidatorNamespace> namespace : scope.children) {
+              inputList.add(namespace.left);
+            }
+            String inputs = Joiner.on(", ").join(inputList);
+
+            throw newValidationError(select,
+                Static.RESOURCE.cannotStreamResultsForNonStreamingInputs(inputs));
           } else {
             return false;
           }
-        default:
+        }
+      }
+      break;
+    default:
+      for (Pair<String, SqlValidatorNamespace> namespace : scope.children) {
+        if (!namespace.right.supportsModality(modality)) {
           if (fail) {
             throw newValidationError(namespace.right.getNode(),
                 Static.RESOURCE.cannotConvertToRelation(namespace.left));

http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
index 9787ba9..2eb4947 100644
--- a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
+++ b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
@@ -197,4 +197,5 @@ ModifiableViewMustBeBasedOnSingleTable=Modifiable view must be based on a single
 MoreThanOneMappedColumn=View is not modifiable. More than one expression maps to column ''{0}'' of base table ''{1}''
 NoValueSuppliedForViewColumn=View is not modifiable. No value is supplied for NOT NULL column ''{0}'' of base table ''{1}''
 TableNotFound=Table ''{0}'' not found
+CannotStreamResultsForNonStreamingInputs=Cannot stream results of a query with no streaming inputs: ''{0}''. At least one input should be convertable to a stream.
 # End CalciteResource.properties

http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java b/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java
index 1a71ec4..50019e1 100644
--- a/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java
+++ b/core/src/test/java/org/apache/calcite/sql/test/SqlAdvisorTest.java
@@ -72,7 +72,9 @@ public class SqlAdvisorTest extends SqlValidatorTestCase {
           "TABLE(CATALOG.SALES.BONUS)",
           "TABLE(CATALOG.SALES.ORDERS)",
           "TABLE(CATALOG.SALES.SALGRADE)",
-          "TABLE(CATALOG.SALES.SHIPMENTS)");
+          "TABLE(CATALOG.SALES.SHIPMENTS)",
+          "TABLE(CATALOG.SALES.PRODUCTS)",
+          "TABLE(CATALOG.SALES.SUPPLIERS)");
 
   private static final List<String> SCHEMAS =
       Arrays.asList(

http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java b/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
index 71b3115..b76b908 100644
--- a/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
+++ b/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
@@ -250,6 +250,22 @@ public class MockCatalogReader implements Prepare.CatalogReader {
     shipmentsStream.addColumn("ORDERID", intType);
     registerTable(shipmentsStream);
 
+    // Register "PRODUCTS" table.
+    MockTable productsTable = MockTable.create(this, salesSchema, "PRODUCTS",
+        false);
+    productsTable.addColumn("PRODUCTID", intType);
+    productsTable.addColumn("NAME", varchar20Type);
+    productsTable.addColumn("SUPPLIERID", intType);
+    registerTable(productsTable);
+
+    // Register "SUPPLIERS" table.
+    MockTable suppliersTable = MockTable.create(this, salesSchema, "SUPPLIERS",
+        false);
+    suppliersTable.addColumn("SUPPLIERID", intType);
+    suppliersTable.addColumn("NAME", varchar20Type);
+    suppliersTable.addColumn("CITY", intType);
+    registerTable(suppliersTable);
+
     // Register "EMP_20" view.
     // Same columns as "EMP",
     // but "DEPTNO" not visible and set to 20 by default

http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
index 026f41e..7ace229 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
@@ -132,6 +132,12 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
     return "Cannot convert stream '" + table + "' to relation";
   }
 
+  private static String cannotStreamResultsForNonStreamingInputs(String inputs) {
+    return "Cannot stream results of a query with no streaming inputs: '"
+        + inputs
+        + "'. At least one input should be convertable to a stream.";
+  }
+
   @Test public void testMultipleSameAsPass() {
     check("select 1 as again,2 as \"again\", 3 as AGAiN from (values (true))");
   }
@@ -7415,6 +7421,15 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
         + "order by floor(rowtime to hour), rowtime desc").ok();
   }
 
+  @Test public void testStreamJoin() {
+    sql("select stream \n"
+        + "orders.rowtime as rowtime, orders.orderId as orderId, products.supplierId as supplierId \n"
+        + "from orders join products on orders.productId = products.productId").ok();
+    sql("^select stream *\n"
+        + "from products join suppliers on products.supplierId = suppliers.supplierId^")
+        .fails(cannotStreamResultsForNonStreamingInputs("PRODUCTS, SUPPLIERS"));
+  }
+
   @Test public void testNew() {
     // (To debug individual statements, paste them into this method.)
     //            1         2         3         4         5         6

http://git-wip-us.apache.org/repos/asf/calcite/blob/e9d50602/core/src/test/java/org/apache/calcite/test/StreamTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/StreamTest.java b/core/src/test/java/org/apache/calcite/test/StreamTest.java
index 269a2e5..90d67dd 100644
--- a/core/src/test/java/org/apache/calcite/test/StreamTest.java
+++ b/core/src/test/java/org/apache/calcite/test/StreamTest.java
@@ -59,6 +59,7 @@ import static org.junit.Assert.assertThat;
 public class StreamTest {
   public static final String STREAM_SCHEMA_NAME = "STREAMS";
   public static final String INFINITE_STREAM_SCHEMA_NAME = "INFINITE_STREAMS";
+  public static final String STREAMJOINS_SCHEMA_NAME = "STREAMJOINS";
 
   private static String schemaFor(String name, Class<? extends TableFactory> clazz) {
     return "     {\n"
@@ -74,6 +75,27 @@ public class StreamTest {
       + "     }";
   }
 
+  private static final String STREAM_JOINS_MODEL = "{\n"
+      + "  version: '1.0',\n"
+      + "  defaultSchema: 'STREAMJOINS',\n"
+      + "   schemas: [\n"
+      + "     {\n"
+      + "       name: 'STREAMJOINS',\n"
+      + "       tables: [ {\n"
+      + "         type: 'custom',\n"
+      + "         name: 'ORDERS',\n"
+      + "         stream: {\n"
+      + "           stream: true\n"
+      + "         },\n"
+      + "         factory: '" + OrdersStreamTableFactory.class.getName() + "'\n"
+      + "       }, \n"
+      + "       {\n"
+      + "         type: 'custom',\n"
+      + "         name: 'PRODUCTS',\n"
+      + "         factory: '" + ProductsTableFactory.class.getName() + "'\n"
+      + "       }]\n"
+      + "     }]}";
+
   public static final String STREAM_MODEL = "{\n"
       + "  version: '1.0',\n"
       + "  defaultSchema: 'foodmart',\n"
@@ -212,6 +234,32 @@ public class StreamTest {
         .returnsCount(100);
   }
 
+  @Test public void testStreamToRelaitonJoin() {
+    CalciteAssert.model(STREAM_JOINS_MODEL)
+        .withDefaultSchema(STREAMJOINS_SCHEMA_NAME)
+        .query("select stream "
+            + "orders.rowtime as rowtime, orders.id as orderId, products.supplier as supplierId "
+            + "from orders join products on orders.product = products.id")
+        .convertContains(
+            "LogicalDelta\n"
+                + "  LogicalProject(ROWTIME=[$0], ORDERID=[$1], SUPPLIERID=[$5])\n"
+                + "    LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3], ID0=[$5], SUPPLIER=[$6])\n"
+                + "      LogicalJoin(condition=[=($4, $5)], joinType=[inner])\n"
+                + "        LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3], PRODUCT4=[CAST($2):VARCHAR(32) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL])\n"
+                + "          LogicalTableScan(table=[[STREAMJOINS, ORDERS]])\n"
+                + "        LogicalTableScan(table=[[STREAMJOINS, PRODUCTS]])\n")
+        .explainContains(""
+            + "EnumerableJoin(condition=[=($4, $5)], joinType=[inner])\n"
+            + "    EnumerableCalc(expr#0..3=[{inputs}], expr#4=[CAST($t2):VARCHAR(32) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL], proj#0..4=[{exprs}])\n"
+            + "      EnumerableInterpreter\n"
+            + "        BindableTableScan(table=[[]])\n"
+            + "    EnumerableInterpreter\n"
+            + "      BindableTableScan(table=[[STREAMJOINS, PRODUCTS]])")
+        .returns(startsWith("ROWTIME=2015-02-15 10:15:00; ORDERID=1; SUPPLIERID=1",
+            "ROWTIME=2015-02-15 10:24:15; ORDERID=2; SUPPLIERID=0",
+            "ROWTIME=2015-02-15 10:24:45; ORDERID=3; SUPPLIERID=1"));
+  }
+
   private Function<ResultSet, Void> startsWith(String... rows) {
     final ImmutableList<String> rowList = ImmutableList.copyOf(rows);
     return new Function<ResultSet, Void>() {
@@ -362,6 +410,66 @@ public class StreamTest {
       return this;
     }
   }
+
+  /**
+   * Mocks simple relation to use for stream joining test.
+   */
+  public static class ProductsTableFactory implements TableFactory<Table> {
+
+    public ProductsTableFactory(){}
+
+    @Override
+    public Table create(SchemaPlus schema, String name, Map<String, Object> operand,
+                        RelDataType rowType) {
+      final ImmutableList<Object[]> rows = ImmutableList.of(
+        new Object[]{"paint", 1},
+        new Object[]{"paper", 0},
+        new Object[]{"brush", 1}
+      );
+      return new ProductsTable(rows);
+    }
+  }
+
+  /**
+   * Table representing the PRODUCTS relation
+   */
+  public static class ProductsTable implements ScannableTable {
+
+    private final ImmutableList<Object[]> rows;
+
+    public ProductsTable(ImmutableList<Object[]> rows) {
+      this.rows = rows;
+    }
+
+    private final RelProtoDataType protoRowType = new RelProtoDataType() {
+      public RelDataType apply(RelDataTypeFactory a0) {
+        return a0.builder()
+            .add("ID", SqlTypeName.VARCHAR, 32)
+            .add("SUPPLIER", SqlTypeName.INTEGER)
+            .build();
+      }
+    };
+
+    @Override
+    public Enumerable<Object[]> scan(DataContext root) {
+      return Linq4j.asEnumerable(rows);
+    }
+
+    @Override
+    public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+      return protoRowType.apply(typeFactory);
+    }
+
+    @Override
+    public Statistic getStatistic() {
+      return Statistics.of(200d, ImmutableList.<ImmutableBitSet>of());
+    }
+
+    @Override
+    public Schema.TableType getJdbcTableType() {
+      return Schema.TableType.TABLE;
+    }
+  }
 }
 
 // End StreamTest.java


[2/2] calcite git commit: Fix up [CALCITE-968]

Posted by jh...@apache.org.
Fix up [CALCITE-968]


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/937fc461
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/937fc461
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/937fc461

Branch: refs/heads/master
Commit: 937fc461a375818921e8147c2d23e26b7e8dfca0
Parents: e9d5060
Author: Julian Hyde <jh...@apache.org>
Authored: Sat Dec 5 14:36:02 2015 -0800
Committer: Julian Hyde <jh...@apache.org>
Committed: Mon Dec 7 14:23:57 2015 -0800

----------------------------------------------------------------------
 .../apache/calcite/rel/stream/StreamRules.java  | 26 +++---
 .../apache/calcite/runtime/CalciteResource.java |  2 +-
 .../calcite/sql/validate/SqlValidatorImpl.java  |  6 +-
 .../calcite/runtime/CalciteResource.properties  |  2 +-
 .../apache/calcite/test/MockCatalogReader.java  |  4 +-
 .../apache/calcite/test/SqlValidatorTest.java   |  2 +-
 .../org/apache/calcite/test/StreamTest.java     | 84 +++++++++-----------
 7 files changed, 60 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
index 4e64dc5..48e7bbf 100644
--- a/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
+++ b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
@@ -29,6 +29,7 @@ import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.core.Values;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
@@ -36,9 +37,9 @@ import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.logical.LogicalSort;
 import org.apache.calcite.rel.logical.LogicalTableScan;
 import org.apache.calcite.rel.logical.LogicalUnion;
-import org.apache.calcite.rel.logical.LogicalValues;
 import org.apache.calcite.schema.StreamableTable;
 import org.apache.calcite.schema.Table;
+import org.apache.calcite.tools.RelBuilder;
 import org.apache.calcite.util.Util;
 
 import com.google.common.collect.ImmutableList;
@@ -201,7 +202,8 @@ public class StreamRules {
 
   /**
    * Planner rule that converts {@link Delta} over a {@link TableScan} of
-   * a table other than {@link org.apache.calcite.schema.StreamableTable} to Empty.
+   * a table other than {@link org.apache.calcite.schema.StreamableTable} to
+   * an empty {@link Values}.
    */
   public static class DeltaTableScanToEmptyRule extends RelOptRule {
     private DeltaTableScanToEmptyRule() {
@@ -213,24 +215,25 @@ public class StreamRules {
     @Override public void onMatch(RelOptRuleCall call) {
       final Delta delta = call.rel(0);
       final TableScan scan = call.rel(1);
-      final RelOptCluster cluster = delta.getCluster();
       final RelOptTable relOptTable = scan.getTable();
       final StreamableTable streamableTable =
           relOptTable.unwrap(StreamableTable.class);
+      final RelBuilder builder = call.builder();
       if (streamableTable == null) {
-        call.transformTo(LogicalValues.createEmpty(cluster, delta.getRowType()));
+        call.transformTo(builder.values(delta.getRowType()).build());
       }
     }
   }
 
-
   /**
    * Planner rule that pushes a {@link Delta} through a {@link Join}.
    *
-   * Product rule [1] is applied to implement the transpose:
-   * stream(x join y)" = "x join stream(y) union all stream(x) join y
+   * <p>We apply something analogous to the
+   * <a href="https://en.wikipedia.org/wiki/Product_rule">product rule of
+   * differential calculus</a> to implement the transpose:
    *
-   * [1] https://en.wikipedia.org/wiki/Product_rule
+   * <blockquote><code>stream(x join y) &rarr;
+   * x join stream(y) union all stream(x) join y</code></blockquote>
    */
   public static class DeltaJoinTransposeRule extends RelOptRule {
 
@@ -240,13 +243,12 @@ public class StreamRules {
               operand(Join.class, any())));
     }
 
-    @Override
     public void onMatch(RelOptRuleCall call) {
       final Delta delta = call.rel(0);
+      Util.discard(delta);
       final Join join = call.rel(1);
-      final RelOptCluster cluster = delta.getCluster();
-      RelNode left = join.getLeft();
-      RelNode right = join.getRight();
+      final RelNode left = join.getLeft();
+      final RelNode right = join.getRight();
 
       final LogicalDelta rightWithDelta = LogicalDelta.create(right);
       final LogicalJoin joinL = LogicalJoin.create(left, rightWithDelta, join.getCondition(),

http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
index c06f3c3..6eabafd 100644
--- a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
+++ b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
@@ -604,7 +604,7 @@ public interface CalciteResource {
   @BaseMessage("Table ''{0}'' not found")
   ExInst<CalciteException> tableNotFound(String tableName);
 
-  @BaseMessage("Cannot stream results of a query with no streaming inputs: ''{0}''. At least one input should be convertable to a stream.")
+  @BaseMessage("Cannot stream results of a query with no streaming inputs: ''{0}''. At least one input should be convertible to a stream")
   ExInst<SqlValidatorException> cannotStreamResultsForNonStreamingInputs(String inputs);
 }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
index d430575..1218d5a 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
@@ -3032,14 +3032,14 @@ public class SqlValidatorImpl implements SqlValidatorWithHints {
           }
         }
       } else {
-        boolean atLeastOneSupportsModality = false;
+        int supportsModalityCount = 0;
         for (Pair<String, SqlValidatorNamespace> namespace : scope.children) {
           if (namespace.right.supportsModality(modality)) {
-            atLeastOneSupportsModality = true;
+            ++supportsModalityCount;
           }
         }
 
-        if (!atLeastOneSupportsModality) {
+        if (supportsModalityCount == 0) {
           if (fail) {
             List<String> inputList = new ArrayList<String>();
             for (Pair<String, SqlValidatorNamespace> namespace : scope.children) {

http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
index 2eb4947..7e97a51 100644
--- a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
+++ b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
@@ -197,5 +197,5 @@ ModifiableViewMustBeBasedOnSingleTable=Modifiable view must be based on a single
 MoreThanOneMappedColumn=View is not modifiable. More than one expression maps to column ''{0}'' of base table ''{1}''
 NoValueSuppliedForViewColumn=View is not modifiable. No value is supplied for NOT NULL column ''{0}'' of base table ''{1}''
 TableNotFound=Table ''{0}'' not found
-CannotStreamResultsForNonStreamingInputs=Cannot stream results of a query with no streaming inputs: ''{0}''. At least one input should be convertable to a stream.
+CannotStreamResultsForNonStreamingInputs=Cannot stream results of a query with no streaming inputs: ''{0}''. At least one input should be convertible to a stream
 # End CalciteResource.properties

http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java b/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
index b76b908..529df04 100644
--- a/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
+++ b/core/src/test/java/org/apache/calcite/test/MockCatalogReader.java
@@ -252,7 +252,7 @@ public class MockCatalogReader implements Prepare.CatalogReader {
 
     // Register "PRODUCTS" table.
     MockTable productsTable = MockTable.create(this, salesSchema, "PRODUCTS",
-        false);
+        false, 200D);
     productsTable.addColumn("PRODUCTID", intType);
     productsTable.addColumn("NAME", varchar20Type);
     productsTable.addColumn("SUPPLIERID", intType);
@@ -260,7 +260,7 @@ public class MockCatalogReader implements Prepare.CatalogReader {
 
     // Register "SUPPLIERS" table.
     MockTable suppliersTable = MockTable.create(this, salesSchema, "SUPPLIERS",
-        false);
+        false, 10D);
     suppliersTable.addColumn("SUPPLIERID", intType);
     suppliersTable.addColumn("NAME", varchar20Type);
     suppliersTable.addColumn("CITY", intType);

http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
index 7ace229..64d1c56 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
@@ -135,7 +135,7 @@ public class SqlValidatorTest extends SqlValidatorTestCase {
   private static String cannotStreamResultsForNonStreamingInputs(String inputs) {
     return "Cannot stream results of a query with no streaming inputs: '"
         + inputs
-        + "'. At least one input should be convertable to a stream.";
+        + "'. At least one input should be convertible to a stream";
   }
 
   @Test public void testMultipleSameAsPass() {

http://git-wip-us.apache.org/repos/asf/calcite/blob/937fc461/core/src/test/java/org/apache/calcite/test/StreamTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/StreamTest.java b/core/src/test/java/org/apache/calcite/test/StreamTest.java
index 90d67dd..649db3d 100644
--- a/core/src/test/java/org/apache/calcite/test/StreamTest.java
+++ b/core/src/test/java/org/apache/calcite/test/StreamTest.java
@@ -59,7 +59,7 @@ import static org.junit.Assert.assertThat;
 public class StreamTest {
   public static final String STREAM_SCHEMA_NAME = "STREAMS";
   public static final String INFINITE_STREAM_SCHEMA_NAME = "INFINITE_STREAMS";
-  public static final String STREAMJOINS_SCHEMA_NAME = "STREAMJOINS";
+  public static final String STREAM_JOINS_SCHEMA_NAME = "STREAM_JOINS";
 
   private static String schemaFor(String name, Class<? extends TableFactory> clazz) {
     return "     {\n"
@@ -77,10 +77,10 @@ public class StreamTest {
 
   private static final String STREAM_JOINS_MODEL = "{\n"
       + "  version: '1.0',\n"
-      + "  defaultSchema: 'STREAMJOINS',\n"
+      + "  defaultSchema: 'STREAM_JOINS',\n"
       + "   schemas: [\n"
       + "     {\n"
-      + "       name: 'STREAMJOINS',\n"
+      + "       name: 'STREAM_JOINS',\n"
       + "       tables: [ {\n"
       + "         type: 'custom',\n"
       + "         name: 'ORDERS',\n"
@@ -234,30 +234,31 @@ public class StreamTest {
         .returnsCount(100);
   }
 
-  @Test public void testStreamToRelaitonJoin() {
+  @Test public void testStreamToRelationJoin() {
     CalciteAssert.model(STREAM_JOINS_MODEL)
-        .withDefaultSchema(STREAMJOINS_SCHEMA_NAME)
+        .withDefaultSchema(STREAM_JOINS_SCHEMA_NAME)
         .query("select stream "
             + "orders.rowtime as rowtime, orders.id as orderId, products.supplier as supplierId "
             + "from orders join products on orders.product = products.id")
-        .convertContains(
-            "LogicalDelta\n"
-                + "  LogicalProject(ROWTIME=[$0], ORDERID=[$1], SUPPLIERID=[$5])\n"
-                + "    LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3], ID0=[$5], SUPPLIER=[$6])\n"
-                + "      LogicalJoin(condition=[=($4, $5)], joinType=[inner])\n"
-                + "        LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3], PRODUCT4=[CAST($2):VARCHAR(32) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL])\n"
-                + "          LogicalTableScan(table=[[STREAMJOINS, ORDERS]])\n"
-                + "        LogicalTableScan(table=[[STREAMJOINS, PRODUCTS]])\n")
+        .convertContains("LogicalDelta\n"
+            + "  LogicalProject(ROWTIME=[$0], ORDERID=[$1], SUPPLIERID=[$5])\n"
+            + "    LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3], ID0=[$5], SUPPLIER=[$6])\n"
+            + "      LogicalJoin(condition=[=($4, $5)], joinType=[inner])\n"
+            + "        LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3], PRODUCT4=[CAST($2):VARCHAR(32) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL])\n"
+            + "          LogicalTableScan(table=[[STREAM_JOINS, ORDERS]])\n"
+            + "        LogicalTableScan(table=[[STREAM_JOINS, PRODUCTS]])\n")
         .explainContains(""
-            + "EnumerableJoin(condition=[=($4, $5)], joinType=[inner])\n"
+            + "EnumerableCalc(expr#0..6=[{inputs}], proj#0..1=[{exprs}], SUPPLIERID=[$t6])\n"
+            + "  EnumerableJoin(condition=[=($4, $5)], joinType=[inner])\n"
             + "    EnumerableCalc(expr#0..3=[{inputs}], expr#4=[CAST($t2):VARCHAR(32) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL], proj#0..4=[{exprs}])\n"
             + "      EnumerableInterpreter\n"
             + "        BindableTableScan(table=[[]])\n"
             + "    EnumerableInterpreter\n"
-            + "      BindableTableScan(table=[[STREAMJOINS, PRODUCTS]])")
-        .returns(startsWith("ROWTIME=2015-02-15 10:15:00; ORDERID=1; SUPPLIERID=1",
-            "ROWTIME=2015-02-15 10:24:15; ORDERID=2; SUPPLIERID=0",
-            "ROWTIME=2015-02-15 10:24:45; ORDERID=3; SUPPLIERID=1"));
+            + "      BindableTableScan(table=[[STREAM_JOINS, PRODUCTS]])")
+        .returns(
+            startsWith("ROWTIME=2015-02-15 10:15:00; ORDERID=1; SUPPLIERID=1",
+                "ROWTIME=2015-02-15 10:24:15; ORDERID=2; SUPPLIERID=0",
+                "ROWTIME=2015-02-15 10:24:45; ORDERID=3; SUPPLIERID=1"));
   }
 
   private Function<ResultSet, Void> startsWith(String... rows) {
@@ -324,14 +325,14 @@ public class StreamTest {
 
     public Table create(SchemaPlus schema, String name,
         Map<String, Object> operand, RelDataType rowType) {
-      final ImmutableList<Object[]> rows = ImmutableList.of(
-          new Object[] {ts(10, 15, 0), 1, "paint", 10},
-          new Object[] {ts(10, 24, 15), 2, "paper", 5},
-          new Object[] {ts(10, 24, 45), 3, "brush", 12},
-          new Object[] {ts(10, 58, 0), 4, "paint", 3},
-          new Object[] {ts(11, 10, 0), 5, "paint", 3});
-
-      return new OrdersTable(rows);
+      final Object[][] rows = {
+        {ts(10, 15, 0), 1, "paint", 10},
+        {ts(10, 24, 15), 2, "paper", 5},
+        {ts(10, 24, 45), 3, "brush", 12},
+        {ts(10, 58, 0), 4, "paint", 3},
+        {ts(11, 10, 0), 5, "paint", 3}
+      };
+      return new OrdersTable(ImmutableList.copyOf(rows));
     }
 
     private Object ts(int h, int m, int s) {
@@ -406,35 +407,30 @@ public class StreamTest {
       });
     }
 
-    @Override public Table stream() {
+    public Table stream() {
       return this;
     }
   }
 
   /**
-   * Mocks simple relation to use for stream joining test.
+   * Mocks a simple relation to use for stream joining test.
    */
   public static class ProductsTableFactory implements TableFactory<Table> {
-
-    public ProductsTableFactory(){}
-
-    @Override
-    public Table create(SchemaPlus schema, String name, Map<String, Object> operand,
-                        RelDataType rowType) {
-      final ImmutableList<Object[]> rows = ImmutableList.of(
-        new Object[]{"paint", 1},
-        new Object[]{"paper", 0},
-        new Object[]{"brush", 1}
-      );
-      return new ProductsTable(rows);
+    public Table create(SchemaPlus schema, String name,
+        Map<String, Object> operand, RelDataType rowType) {
+      final Object[][] rows = {
+        {"paint", 1},
+        {"paper", 0},
+        {"brush", 1}
+      };
+      return new ProductsTable(ImmutableList.copyOf(rows));
     }
   }
 
   /**
-   * Table representing the PRODUCTS relation
+   * Table representing the PRODUCTS relation.
    */
   public static class ProductsTable implements ScannableTable {
-
     private final ImmutableList<Object[]> rows;
 
     public ProductsTable(ImmutableList<Object[]> rows) {
@@ -450,22 +446,18 @@ public class StreamTest {
       }
     };
 
-    @Override
     public Enumerable<Object[]> scan(DataContext root) {
       return Linq4j.asEnumerable(rows);
     }
 
-    @Override
     public RelDataType getRowType(RelDataTypeFactory typeFactory) {
       return protoRowType.apply(typeFactory);
     }
 
-    @Override
     public Statistic getStatistic() {
       return Statistics.of(200d, ImmutableList.<ImmutableBitSet>of());
     }
 
-    @Override
     public Schema.TableType getJdbcTableType() {
       return Schema.TableType.TABLE;
     }