You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/08/27 20:45:04 UTC

[GitHub] [beam] ibzib commented on a change in pull request #14729: [BEAM-9379] Update calcite to 1.26

ibzib commented on a change in pull request #14729:
URL: https://github.com/apache/beam/pull/14729#discussion_r697689522



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
##########
@@ -223,7 +223,8 @@ private void registerFunctions(BeamSqlEnvBuilder sqlEnvBuilder) {
    * <p>Any available implementation of {@link QueryPlanner} can be used as the query planner in
    * {@link SqlTransform}. An implementation can be specified globally for the entire pipeline with
    * {@link BeamSqlPipelineOptions#getPlannerName()}. The global planner can be overridden
-   * per-transform with {@link #withQueryPlannerClass(Class<? extends QueryPlanner>)}.
+   * per-transform with {@link #withQueryPlannerClass(Class) #withQueryPlannerClass(Class<? extends

Review comment:
       I think this change was intended to delete `#withQueryPlannerClass(Class<? extends QueryPlanner>)` (I'm guessing because Javadoc doesn't recognize generic method arguments).

##########
File path: sdks/java/extensions/sql/src/main/codegen/config.fmpp
##########
@@ -344,37 +366,59 @@ data: {
         "TBLPROPERTIES"
       ]
 
+      # List of non-reserved keywords to add;
+      # items in this list become non-reserved
+      nonReservedKeywordsToAdd: [
+      ]
+
+      # List of non-reserved keywords to remove;
+      # items in this list become reserved
+      nonReservedKeywordsToRemove: [
+      ]
+
       # List of additional join types. Each is a method with no arguments.
       # Example: LeftSemiJoin()
       joinTypes: [
       ]
 
       # List of methods for parsing custom SQL statements.
+      # Return type of method implementation should be 'SqlNode'.
+      # Example: SqlShowDatabases(), SqlShowTables().
       statementParserMethods: [
         "SqlSetOptionBeam(Span.of(), null)"
-        "SqlCreateExternalTable()"
         "SqlCreateFunction()"
       ]
 
       # List of methods for parsing custom literals.
+      # Return type of method implementation should be "SqlNode".
       # Example: ParseJsonLiteral().
       literalParserMethods: [
       ]
 
       # List of methods for parsing custom data types.
+      # Return type of method implementation should be "SqlTypeNameSpec".
+      # Example: SqlParseTimeStampZ().
       dataTypeParserMethods: [
       ]
 
+      # List of methods for parsing builtin function calls.
+      # Return type of method implementation should be "SqlNode".
+      # Example: DateFunctionCall().
+      builtinFunctionCallMethods: [
+      ]
+
       # List of methods for parsing extensions to "ALTER <scope>" calls.
       # Each must accept arguments "(SqlParserPos pos, String scope)".
+      # Example: "SqlUploadJarNode"
       alterStatementParserMethods: [
         "SqlSetOptionBeam"
       ]
 
       # List of methods for parsing extensions to "CREATE [OR REPLACE]" calls.
       # Each must accept arguments "(SqlParserPos pos, boolean replace)".
       createStatementParserMethods: [
-        SqlCreateTableNotSupportedMessage
+        "SqlCreateExternalTable"

Review comment:
       Should we move CREATE FUNCTION into this block as well?

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
##########
@@ -81,69 +61,68 @@
   private static final List<RelOptRule> LOGICAL_OPTIMIZATIONS =
       ImmutableList.of(
           // Rules for window functions
-          ProjectToWindowRule.PROJECT,
+          CoreRules.PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW,
           // Rules so we only have to implement Calc
-          FilterCalcMergeRule.INSTANCE,
-          ProjectCalcMergeRule.INSTANCE,
-          FilterToCalcRule.INSTANCE,
-          ProjectToCalcRule.INSTANCE,
+          CoreRules.FILTER_CALC_MERGE,
+          CoreRules.PROJECT_CALC_MERGE,
+          CoreRules.FILTER_TO_CALC,
+          CoreRules.PROJECT_TO_CALC,
           BeamIOPushDownRule.INSTANCE,
           // disabled due to https://issues.apache.org/jira/browse/BEAM-6810
-          // CalcRemoveRule.INSTANCE,
+          // CoreRules.CALC_REMOVE,
 
           // Rules to merge matching Calcs together.
           LogicalCalcMergeRule.INSTANCE,
           BeamCalcMergeRule.INSTANCE,
 
           // push a filter into a join
-          FilterJoinRule.FILTER_ON_JOIN,
+          CoreRules.FILTER_INTO_JOIN,
           // push filter into the children of a join
-          FilterJoinRule.JOIN,
+          CoreRules.JOIN_CONDITION_PUSH,
           // push filter through an aggregation
-          FilterAggregateTransposeRule.INSTANCE,
+          CoreRules.FILTER_AGGREGATE_TRANSPOSE,
           // push filter through set operation
-          FilterSetOpTransposeRule.INSTANCE,
+          CoreRules.FILTER_SET_OP_TRANSPOSE,
           // push project through set operation
-          ProjectSetOpTransposeRule.INSTANCE,
+          CoreRules.PROJECT_SET_OP_TRANSPOSE,
 
           // aggregation and projection rules
           BeamAggregateProjectMergeRule.INSTANCE,
           // push a projection past a filter or vice versa
-          ProjectFilterTransposeRule.INSTANCE,
-          FilterProjectTransposeRule.INSTANCE,
+          CoreRules.PROJECT_FILTER_TRANSPOSE,
+          CoreRules.FILTER_PROJECT_TRANSPOSE,
           // push a projection to the children of a join
           // merge projections
-          ProjectMergeRule.INSTANCE,
-          // ProjectRemoveRule.INSTANCE,
+          CoreRules.PROJECT_MERGE,
+          // CoreRules.PROJECT_REMOVE,
           // reorder sort and projection
-          SortProjectTransposeRule.INSTANCE,
-          ProjectSortTransposeRule.INSTANCE,
+          CoreRules.SORT_PROJECT_TRANSPOSE,
 
           // join rules
-          JoinPushExpressionsRule.INSTANCE,
-          JoinCommuteRule.INSTANCE,
+          CoreRules.JOIN_PUSH_EXPRESSIONS,
+          CoreRules.JOIN_COMMUTE,
           BeamJoinAssociateRule.INSTANCE,
           BeamJoinPushThroughJoinRule.RIGHT,
           BeamJoinPushThroughJoinRule.LEFT,
 
           // remove union with only a single child
-          UnionEliminatorRule.INSTANCE,
+          CoreRules.UNION_REMOVE,
           // convert non-all union into all-union + distinct
-          UnionToDistinctRule.INSTANCE,
+          CoreRules.UNION_TO_DISTINCT,
 
           // remove aggregation if it does not aggregate and input is already distinct
-          AggregateRemoveRule.INSTANCE,
+          CoreRules.AGGREGATE_REMOVE,
           // push aggregate through join
-          AggregateJoinTransposeRule.EXTENDED,
+          CoreRules.AGGREGATE_JOIN_TRANSPOSE_EXTENDED,
           // aggregate union rule
-          AggregateUnionAggregateRule.INSTANCE,
+          CoreRules.AGGREGATE_UNION_AGGREGATE,
 
           // reduce aggregate functions like AVG, STDDEV_POP etc.
-          // AggregateReduceFunctionsRule.INSTANCE,
+          CoreRules.AGGREGATE_REDUCE_FUNCTIONS,

Review comment:
       This used to be commented out. Did we mean to enable it?

##########
File path: sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
##########
@@ -160,7 +160,7 @@ SqlCreate SqlCreateExternalTable() :
 }
 {
 
-    <CREATE> <EXTERNAL> <TABLE> {
+    <EXTERNAL> <TABLE> {

Review comment:
       It doesn't look like there are any changes to the syntax. It seems the CREATE is implicit now because this syntax was moved to `createStatementParserMethods` in `sdks/java/extensions/sql/src/main/codegen/config.fmpp`.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
##########
@@ -81,69 +61,68 @@
   private static final List<RelOptRule> LOGICAL_OPTIMIZATIONS =
       ImmutableList.of(
           // Rules for window functions
-          ProjectToWindowRule.PROJECT,
+          CoreRules.PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW,
           // Rules so we only have to implement Calc
-          FilterCalcMergeRule.INSTANCE,
-          ProjectCalcMergeRule.INSTANCE,
-          FilterToCalcRule.INSTANCE,
-          ProjectToCalcRule.INSTANCE,
+          CoreRules.FILTER_CALC_MERGE,
+          CoreRules.PROJECT_CALC_MERGE,
+          CoreRules.FILTER_TO_CALC,
+          CoreRules.PROJECT_TO_CALC,
           BeamIOPushDownRule.INSTANCE,
           // disabled due to https://issues.apache.org/jira/browse/BEAM-6810
-          // CalcRemoveRule.INSTANCE,
+          // CoreRules.CALC_REMOVE,
 
           // Rules to merge matching Calcs together.
           LogicalCalcMergeRule.INSTANCE,
           BeamCalcMergeRule.INSTANCE,
 
           // push a filter into a join
-          FilterJoinRule.FILTER_ON_JOIN,
+          CoreRules.FILTER_INTO_JOIN,
           // push filter into the children of a join
-          FilterJoinRule.JOIN,
+          CoreRules.JOIN_CONDITION_PUSH,
           // push filter through an aggregation
-          FilterAggregateTransposeRule.INSTANCE,
+          CoreRules.FILTER_AGGREGATE_TRANSPOSE,
           // push filter through set operation
-          FilterSetOpTransposeRule.INSTANCE,
+          CoreRules.FILTER_SET_OP_TRANSPOSE,
           // push project through set operation
-          ProjectSetOpTransposeRule.INSTANCE,
+          CoreRules.PROJECT_SET_OP_TRANSPOSE,
 
           // aggregation and projection rules
           BeamAggregateProjectMergeRule.INSTANCE,
           // push a projection past a filter or vice versa
-          ProjectFilterTransposeRule.INSTANCE,
-          FilterProjectTransposeRule.INSTANCE,
+          CoreRules.PROJECT_FILTER_TRANSPOSE,
+          CoreRules.FILTER_PROJECT_TRANSPOSE,
           // push a projection to the children of a join
           // merge projections
-          ProjectMergeRule.INSTANCE,
-          // ProjectRemoveRule.INSTANCE,
+          CoreRules.PROJECT_MERGE,
+          // CoreRules.PROJECT_REMOVE,
           // reorder sort and projection
-          SortProjectTransposeRule.INSTANCE,
-          ProjectSortTransposeRule.INSTANCE,
+          CoreRules.SORT_PROJECT_TRANSPOSE,

Review comment:
       Looks like there is no CoreRules.PROJECT_SORT_TRANSPOSE. So I guess we don't need it?

##########
File path: CHANGES.md
##########
@@ -60,10 +60,12 @@
 
 * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 * Upgrade Flink runner to Flink versions 1.13.2, 1.12.5 and 1.11.4 ([BEAM-10955](https://issues.apache.org/jira/browse/BEAM-10955)).
+* Upgrade to Calcite 1.26.0 ([BEAM-9379](https://issues.apache.org/jira/browse/BEAM-9379).

Review comment:
       This will need to be moved into the 2.34.x section.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/RelMdNodeStats.java
##########
@@ -71,16 +72,34 @@ private NodeStats getBeamNodeStats(BeamRelNode rel, RelMetadataQuery mq) {
     // wraps the metadata provider with CachingRelMetadataProvider. However,
     // CachingRelMetadataProvider checks timestamp before returning previous results. Therefore,
     // there wouldn't be a problem in that case.
-    List<List> keys =
-        mq.map.entrySet().stream()
-            .filter(entry -> entry.getValue() instanceof NodeStats)
-            .filter(entry -> ((NodeStats) entry.getValue()).isUnknown())
-            .map(Map.Entry::getKey)
-            .collect(Collectors.toList());
-
-    for (List key : keys) {
-      mq.map.remove(key);
+    Set<Table.Cell<RelNode, List, Object>> cells = mq.map.cellSet();
+    List<Table.Cell<RelNode, List, Object>> keys = new ArrayList<>(cells.size());
+    for (Table.Cell<RelNode, List, Object> cell : cells) {

Review comment:
       Nit: can we keep the functional syntax, or is the null checker not able to handle that?
   
   ```
           List<Table.Cell<RelNode, List, Object>> keys =
               mq.map.cellSet().stream()
                   .filter(entry -> entry != null)
                   .filter(entry -> entry.getValue() instanceof NodeStats)
                   .filter(entry -> ((NodeStats) entry.getValue()).isUnknown())
                   .collect(Collectors.toList());
   ```
   
   Also it seems nonsensical, not to mention highly unlikely for the cell set to contain the null element, so we can throw instead of silently ignoring it.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamSqlUnparseContext.java
##########
@@ -114,6 +125,12 @@ public SqlNode toSql(RexProgram program, RexNode rex) {
       final String name = "null_param_" + index;
       nullParams.put(name, param.getType());
       return new NamedDynamicParam(index, POS, name);
+    } else if (SqlKind.SEARCH.equals(rex.getKind())) {
+      // Workaround bug in Calcite

Review comment:
       What bug? Is there a jira?

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlInOperatorRewriter.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql.translation;
+
+import java.util.List;
+import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rex.RexBuilder;
+import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rex.RexLiteral;
+import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rex.RexNode;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+
+/** Rewrites $in calls as SEARCH calls. */
+class SqlInOperatorRewriter implements SqlOperatorRewriter {
+  @Override
+  public RexNode apply(RexBuilder rexBuilder, List<RexNode> operands) {
+    Preconditions.checkArgument(
+        operands.size() >= 2, "IN should have at least two arguments in function call.");
+    final RexNode arg = operands.get(0);
+    final List<RexNode> ranges = ImmutableList.copyOf(operands.subList(1, operands.size()));
+
+    // ZetaSQL has wierd behavior for NULL...

Review comment:
       ```suggestion
       // ZetaSQL has weird behavior for NULL...
   ```

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSqlRelUtils.java
##########
@@ -50,7 +50,13 @@
     } else {
       return PCollectionList.of(
           inputRels.stream()
-              .map(input -> BeamSqlRelUtils.toPCollection(pipeline, (BeamRelNode) input, cache))
+              .map(
+                  input -> {
+                    if (input instanceof RelSubset) {
+                      input = ((RelSubset) input).getBest();

Review comment:
       Nit: might be superstitious of me, but I dislike mutating `input` here.

##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamAggregateProjectMergeRuleTest.java
##########
@@ -59,9 +60,19 @@ public void buildUp() {
     Table projectTable = getTable("TEST_PROJECT", PushDownOptions.PROJECT);
     Table filterTable = getTable("TEST_FILTER", PushDownOptions.FILTER);
     Table noneTable = getTable("TEST_NONE", PushDownOptions.NONE);
+
     tableProvider.createTable(projectTable);
     tableProvider.createTable(filterTable);
     tableProvider.createTable(noneTable);
+
+    // Rules are cost based, need rows to optimize!
+    tableProvider.addRows(
+        "TEST_PROJECT", Row.withSchema(BASIC_SCHEMA).addValues(1, 2, "3", 4).build());

Review comment:
       Do these need to be different rows?

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/cep/CEPUtils.java
##########
@@ -156,11 +156,11 @@ public static String getRegexFromPattern(RexNode call) {
   }
 
   /** Transform the partition columns into serializable CEPFieldRef. */
-  public static List<CEPFieldRef> getCEPFieldRefFromParKeys(List<RexNode> parKeys) {
+  public static List<CEPFieldRef> getCEPFieldRefFromParKeys(ImmutableBitSet parKeys) {
     ArrayList<CEPFieldRef> fieldList = new ArrayList<>();
-    for (RexNode i : parKeys) {
-      RexInputRef parKey = (RexInputRef) i;
-      fieldList.add(new CEPFieldRef(parKey.getName(), parKey.getIndex()));
+    for (int index : parKeys.asList()) {
+      // FIXME: Don't know where to get a better name.

Review comment:
       If this is a "partition key," can't we just name it that?
   
   Same goes for BeamMatchRel.java.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org