You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2020/01/30 19:23:32 UTC

[beam] branch master updated: Cleanup ZetaSQLQueryPlanner and ExpressionConverter code

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new cb02468  Cleanup ZetaSQLQueryPlanner and ExpressionConverter code
     new a926175  Merge pull request #10715 from robinyqiu/cleanup
cb02468 is described below

commit cb02468bbe216c307c800561710b992ef86a90fe
Author: Yueyang Qiu <ro...@gmail.com>
AuthorDate: Wed Jan 29 11:44:15 2020 -0800

    Cleanup ZetaSQLQueryPlanner and ExpressionConverter code
---
 .../beam/sdk/extensions/sql/SqlTransform.java      |  4 +--
 .../beam/sdk/extensions/sql/impl/QueryPlanner.java |  4 +--
 .../sql/zetasql/ZetaSQLQueryPlanner.java           | 31 ++++++++--------
 .../zetasql/translation/ExpressionConverter.java   | 41 +++++++++++-----------
 .../sql/zetasql/ZetaSQLDialectSpecTest.java        | 13 +++----
 5 files changed, 42 insertions(+), 51 deletions(-)

diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
index 13186e0..adc3f9d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
@@ -199,11 +199,11 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>>
     return withTableProvider(name, tableProvider).toBuilder().setDefaultTableProvider(name).build();
   }
 
-  public SqlTransform withNamedParameters(Map parameters) {
+  public SqlTransform withNamedParameters(Map<String, ?> parameters) {
     return toBuilder().setQueryParameters(QueryParameters.ofNamed(parameters)).build();
   }
 
-  public SqlTransform withPositionalParameters(List parameters) {
+  public SqlTransform withPositionalParameters(List<?> parameters) {
     return toBuilder().setQueryParameters(QueryParameters.ofPositional(parameters)).build();
   }
 
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java
index 755737a..67e1f82 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java
@@ -47,9 +47,9 @@ public interface QueryPlanner {
 
     abstract void none();
 
-    public abstract Map named();
+    public abstract Map<String, ?> named();
 
-    public abstract List positional();
+    public abstract List<?> positional();
 
     public static QueryParameters ofNone() {
       return AutoOneOf_QueryPlanner_QueryParameters.none();
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java
index c430113..5f700e3 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.extensions.sql.zetasql;
 
 import com.google.zetasql.Value;
+import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.NonCumulativeCostImpl;
 import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
@@ -97,11 +98,21 @@ public class ZetaSQLQueryPlanner implements QueryPlanner {
     return convertToBeamRel(sqlStatement, QueryParameters.ofNone());
   }
 
+  public BeamRelNode convertToBeamRel(String sqlStatement, Map<String, Value> queryParams)
+      throws ParseException, SqlConversionException {
+    return convertToBeamRel(sqlStatement, QueryParameters.ofNamed(queryParams));
+  }
+
+  public BeamRelNode convertToBeamRel(String sqlStatement, List<Value> queryParams)
+      throws ParseException, SqlConversionException {
+    return convertToBeamRel(sqlStatement, QueryParameters.ofPositional(queryParams));
+  }
+
   @Override
   public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryParameters)
       throws ParseException, SqlConversionException {
     try {
-      return parseQuery(sqlStatement, queryParameters);
+      return convertToBeamRelInternal(sqlStatement, queryParameters);
     } catch (RelConversionException e) {
       throw new SqlConversionException(e.getCause());
     }
@@ -115,20 +126,7 @@ public class ZetaSQLQueryPlanner implements QueryPlanner {
             this.getClass().getCanonicalName()));
   }
 
-  public BeamRelNode convertToBeamRel(String sqlStatement, Map<String, Value> queryParams)
-      throws ParseException, SqlConversionException {
-    try {
-      return parseQuery(sqlStatement, QueryParameters.ofNamed(queryParams));
-    } catch (RelConversionException e) {
-      throw new SqlConversionException(e.getCause());
-    }
-  }
-
-  public BeamRelNode parseQuery(String sql) throws RelConversionException {
-    return parseQuery(sql, QueryParameters.ofNone());
-  }
-
-  public BeamRelNode parseQuery(String sql, QueryParameters queryParams)
+  private BeamRelNode convertToBeamRelInternal(String sql, QueryParameters queryParams)
       throws RelConversionException {
     RelRoot root = plannerImpl.rel(sql, queryParams);
     RelTraitSet desiredTraits =
@@ -149,8 +147,7 @@ public class ZetaSQLQueryPlanner implements QueryPlanner {
     RelMetadataQuery.THREAD_PROVIDERS.set(
         JaninoRelMetadataProvider.of(root.rel.getCluster().getMetadataProvider()));
     root.rel.getCluster().invalidateMetadataQuery();
-    BeamRelNode beamRelNode = (BeamRelNode) plannerImpl.transform(0, desiredTraits, root.rel);
-    return beamRelNode;
+    return (BeamRelNode) plannerImpl.transform(0, desiredTraits, root.rel);
   }
 
   private FrameworkConfig defaultConfig(JdbcConnection connection, RuleSet[] ruleSets) {
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
index eefd40b..5c7feb1 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
@@ -64,7 +64,6 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner.QueryParameters;
-import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner.QueryParameters.Kind;
 import org.apache.beam.sdk.extensions.sql.zetasql.SqlOperatorRewriter;
 import org.apache.beam.sdk.extensions.sql.zetasql.SqlOperators;
 import org.apache.beam.sdk.extensions.sql.zetasql.SqlStdOperatorMappingTable;
@@ -1002,27 +1001,27 @@ public class ExpressionConverter {
   }
 
   private RexNode convertResolvedParameter(ResolvedParameter parameter) {
-    if (queryParams.getKind() == Kind.NAMED) {
-      Map<String, Value> queryParameterMap = (Map<String, Value>) queryParams.named();
-      Value value = queryParameterMap.get(parameter.getName());
-      Preconditions.checkState(
-          parameter.getType().equals(value.getType()),
-          String.format(
-              "Expected resolved parameter %s to have type %s, but it has type %s",
-              parameter.getName(), value.getType(), parameter.getType()));
-      return convertValueToRexNode(value.getType(), value);
-    } else if (queryParams.getKind() == Kind.POSITIONAL) {
-      List<Value> queryParameterList = (List<Value>) queryParams.positional();
-      // parameter is 1-indexed, while parameter list is 0-indexed.
-      Value value = queryParameterList.get((int) parameter.getPosition() - 1);
-      Preconditions.checkState(
-          parameter.getType().equals(value.getType()),
-          String.format(
-              "Expected resolved parameter %d to have type %s, but it has type %s",
-              parameter.getPosition(), value.getType(), parameter.getType()));
-      return convertValueToRexNode(value.getType(), value);
+    Value value;
+    String identifier;
+    switch (queryParams.getKind()) {
+      case NAMED:
+        value = ((Map<String, Value>) queryParams.named()).get(parameter.getName());
+        identifier = parameter.getName();
+        break;
+      case POSITIONAL:
+        // parameter is 1-indexed, while parameter list is 0-indexed.
+        value = ((List<Value>) queryParams.positional()).get((int) parameter.getPosition() - 1);
+        identifier = Long.toString(parameter.getPosition());
+        break;
+      default:
+        throw new IllegalArgumentException("Found unexpected parameter " + parameter);
     }
-    throw new IllegalArgumentException("Found unexpected parameter " + parameter);
+    Preconditions.checkState(
+        parameter.getType().equals(value.getType()),
+        String.format(
+            "Expected resolved parameter %s to have type %s, but it has type %s",
+            identifier, value.getType(), parameter.getType()));
+    return convertValueToRexNode(value.getType(), value);
   }
 
   private RexNode convertResolvedStructFieldAccess(ResolvedGetStructField resolvedGetStructField) {
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
index bdfbf27..bf12bb7 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
@@ -58,7 +58,6 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
 import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
-import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner.QueryParameters;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
@@ -215,8 +214,7 @@ public class ZetaSQLDialectSpecTest {
     ImmutableList<Value> params = ImmutableList.of(Value.createStringValue("abc\n"));
 
     ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    BeamRelNode beamRelNode =
-        zetaSQLQueryPlanner.convertToBeamRel(sql, QueryParameters.ofPositional(params));
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
     PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
 
     final Schema schema = Schema.builder().addNullableField("ColA", FieldType.STRING).build();
@@ -332,8 +330,7 @@ public class ZetaSQLDialectSpecTest {
     ImmutableList<Value> params =
         ImmutableList.of(Value.createInt64Value(4L), Value.createInt64Value(5L));
     ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    BeamRelNode beamRelNode =
-        zetaSQLQueryPlanner.convertToBeamRel(sql, QueryParameters.ofPositional(params));
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
     PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
 
     final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build();
@@ -431,8 +428,7 @@ public class ZetaSQLDialectSpecTest {
             Value.createBoolValue(true), Value.createInt64Value(1), Value.createInt64Value(2));
 
     ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    BeamRelNode beamRelNode =
-        zetaSQLQueryPlanner.convertToBeamRel(sql, QueryParameters.ofPositional(params));
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
     PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
 
     final Schema schema = Schema.builder().addNullableField("field1", FieldType.INT64).build();
@@ -3012,8 +3008,7 @@ public class ZetaSQLDialectSpecTest {
             Value.createStringValue("c"));
 
     ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    BeamRelNode beamRelNode =
-        zetaSQLQueryPlanner.convertToBeamRel(sql, QueryParameters.ofPositional(params));
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
     PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
     final Schema schema = Schema.builder().addStringField("field1").build();
     PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("abc").build());