You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2020/01/30 19:23:32 UTC
[beam] branch master updated: Cleanup ZetaSQLQueryPlanner and
ExpressionConverter code
This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new cb02468 Cleanup ZetaSQLQueryPlanner and ExpressionConverter code
new a926175 Merge pull request #10715 from robinyqiu/cleanup
cb02468 is described below
commit cb02468bbe216c307c800561710b992ef86a90fe
Author: Yueyang Qiu <ro...@gmail.com>
AuthorDate: Wed Jan 29 11:44:15 2020 -0800
Cleanup ZetaSQLQueryPlanner and ExpressionConverter code
---
.../beam/sdk/extensions/sql/SqlTransform.java | 4 +--
.../beam/sdk/extensions/sql/impl/QueryPlanner.java | 4 +--
.../sql/zetasql/ZetaSQLQueryPlanner.java | 31 ++++++++--------
.../zetasql/translation/ExpressionConverter.java | 41 +++++++++++-----------
.../sql/zetasql/ZetaSQLDialectSpecTest.java | 13 +++----
5 files changed, 42 insertions(+), 51 deletions(-)
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
index 13186e0..adc3f9d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
@@ -199,11 +199,11 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>>
return withTableProvider(name, tableProvider).toBuilder().setDefaultTableProvider(name).build();
}
- public SqlTransform withNamedParameters(Map parameters) {
+ public SqlTransform withNamedParameters(Map<String, ?> parameters) {
return toBuilder().setQueryParameters(QueryParameters.ofNamed(parameters)).build();
}
- public SqlTransform withPositionalParameters(List parameters) {
+ public SqlTransform withPositionalParameters(List<?> parameters) {
return toBuilder().setQueryParameters(QueryParameters.ofPositional(parameters)).build();
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java
index 755737a..67e1f82 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java
@@ -47,9 +47,9 @@ public interface QueryPlanner {
abstract void none();
- public abstract Map named();
+ public abstract Map<String, ?> named();
- public abstract List positional();
+ public abstract List<?> positional();
public static QueryParameters ofNone() {
return AutoOneOf_QueryPlanner_QueryParameters.none();
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java
index c430113..5f700e3 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.extensions.sql.zetasql;
import com.google.zetasql.Value;
+import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.NonCumulativeCostImpl;
import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
@@ -97,11 +98,21 @@ public class ZetaSQLQueryPlanner implements QueryPlanner {
return convertToBeamRel(sqlStatement, QueryParameters.ofNone());
}
+ public BeamRelNode convertToBeamRel(String sqlStatement, Map<String, Value> queryParams)
+ throws ParseException, SqlConversionException {
+ return convertToBeamRel(sqlStatement, QueryParameters.ofNamed(queryParams));
+ }
+
+ public BeamRelNode convertToBeamRel(String sqlStatement, List<Value> queryParams)
+ throws ParseException, SqlConversionException {
+ return convertToBeamRel(sqlStatement, QueryParameters.ofPositional(queryParams));
+ }
+
@Override
public BeamRelNode convertToBeamRel(String sqlStatement, QueryParameters queryParameters)
throws ParseException, SqlConversionException {
try {
- return parseQuery(sqlStatement, queryParameters);
+ return convertToBeamRelInternal(sqlStatement, queryParameters);
} catch (RelConversionException e) {
throw new SqlConversionException(e.getCause());
}
@@ -115,20 +126,7 @@ public class ZetaSQLQueryPlanner implements QueryPlanner {
this.getClass().getCanonicalName()));
}
- public BeamRelNode convertToBeamRel(String sqlStatement, Map<String, Value> queryParams)
- throws ParseException, SqlConversionException {
- try {
- return parseQuery(sqlStatement, QueryParameters.ofNamed(queryParams));
- } catch (RelConversionException e) {
- throw new SqlConversionException(e.getCause());
- }
- }
-
- public BeamRelNode parseQuery(String sql) throws RelConversionException {
- return parseQuery(sql, QueryParameters.ofNone());
- }
-
- public BeamRelNode parseQuery(String sql, QueryParameters queryParams)
+ private BeamRelNode convertToBeamRelInternal(String sql, QueryParameters queryParams)
throws RelConversionException {
RelRoot root = plannerImpl.rel(sql, queryParams);
RelTraitSet desiredTraits =
@@ -149,8 +147,7 @@ public class ZetaSQLQueryPlanner implements QueryPlanner {
RelMetadataQuery.THREAD_PROVIDERS.set(
JaninoRelMetadataProvider.of(root.rel.getCluster().getMetadataProvider()));
root.rel.getCluster().invalidateMetadataQuery();
- BeamRelNode beamRelNode = (BeamRelNode) plannerImpl.transform(0, desiredTraits, root.rel);
- return beamRelNode;
+ return (BeamRelNode) plannerImpl.transform(0, desiredTraits, root.rel);
}
private FrameworkConfig defaultConfig(JdbcConnection connection, RuleSet[] ruleSets) {
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
index eefd40b..5c7feb1 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
@@ -64,7 +64,6 @@ import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner.QueryParameters;
-import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner.QueryParameters.Kind;
import org.apache.beam.sdk.extensions.sql.zetasql.SqlOperatorRewriter;
import org.apache.beam.sdk.extensions.sql.zetasql.SqlOperators;
import org.apache.beam.sdk.extensions.sql.zetasql.SqlStdOperatorMappingTable;
@@ -1002,27 +1001,27 @@ public class ExpressionConverter {
}
private RexNode convertResolvedParameter(ResolvedParameter parameter) {
- if (queryParams.getKind() == Kind.NAMED) {
- Map<String, Value> queryParameterMap = (Map<String, Value>) queryParams.named();
- Value value = queryParameterMap.get(parameter.getName());
- Preconditions.checkState(
- parameter.getType().equals(value.getType()),
- String.format(
- "Expected resolved parameter %s to have type %s, but it has type %s",
- parameter.getName(), value.getType(), parameter.getType()));
- return convertValueToRexNode(value.getType(), value);
- } else if (queryParams.getKind() == Kind.POSITIONAL) {
- List<Value> queryParameterList = (List<Value>) queryParams.positional();
- // parameter is 1-indexed, while parameter list is 0-indexed.
- Value value = queryParameterList.get((int) parameter.getPosition() - 1);
- Preconditions.checkState(
- parameter.getType().equals(value.getType()),
- String.format(
- "Expected resolved parameter %d to have type %s, but it has type %s",
- parameter.getPosition(), value.getType(), parameter.getType()));
- return convertValueToRexNode(value.getType(), value);
+ Value value;
+ String identifier;
+ switch (queryParams.getKind()) {
+ case NAMED:
+ value = ((Map<String, Value>) queryParams.named()).get(parameter.getName());
+ identifier = parameter.getName();
+ break;
+ case POSITIONAL:
+ // parameter is 1-indexed, while parameter list is 0-indexed.
+ value = ((List<Value>) queryParams.positional()).get((int) parameter.getPosition() - 1);
+ identifier = Long.toString(parameter.getPosition());
+ break;
+ default:
+ throw new IllegalArgumentException("Found unexpected parameter " + parameter);
}
- throw new IllegalArgumentException("Found unexpected parameter " + parameter);
+ Preconditions.checkState(
+ parameter.getType().equals(value.getType()),
+ String.format(
+ "Expected resolved parameter %s to have type %s, but it has type %s",
+ identifier, value.getType(), parameter.getType()));
+ return convertValueToRexNode(value.getType(), value);
}
private RexNode convertResolvedStructFieldAccess(ResolvedGetStructField resolvedGetStructField) {
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
index bdfbf27..bf12bb7 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
@@ -58,7 +58,6 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
-import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner.QueryParameters;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
@@ -215,8 +214,7 @@ public class ZetaSQLDialectSpecTest {
ImmutableList<Value> params = ImmutableList.of(Value.createStringValue("abc\n"));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
- BeamRelNode beamRelNode =
- zetaSQLQueryPlanner.convertToBeamRel(sql, QueryParameters.ofPositional(params));
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("ColA", FieldType.STRING).build();
@@ -332,8 +330,7 @@ public class ZetaSQLDialectSpecTest {
ImmutableList<Value> params =
ImmutableList.of(Value.createInt64Value(4L), Value.createInt64Value(5L));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
- BeamRelNode beamRelNode =
- zetaSQLQueryPlanner.convertToBeamRel(sql, QueryParameters.ofPositional(params));
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.BOOLEAN).build();
@@ -431,8 +428,7 @@ public class ZetaSQLDialectSpecTest {
Value.createBoolValue(true), Value.createInt64Value(1), Value.createInt64Value(2));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
- BeamRelNode beamRelNode =
- zetaSQLQueryPlanner.convertToBeamRel(sql, QueryParameters.ofPositional(params));
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addNullableField("field1", FieldType.INT64).build();
@@ -3012,8 +3008,7 @@ public class ZetaSQLDialectSpecTest {
Value.createStringValue("c"));
ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
- BeamRelNode beamRelNode =
- zetaSQLQueryPlanner.convertToBeamRel(sql, QueryParameters.ofPositional(params));
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql, params);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
final Schema schema = Schema.builder().addStringField("field1").build();
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues("abc").build());