You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2020/05/20 21:59:57 UTC
[beam] branch master updated: [BEAM-9699] Add test verifying we can
use ZetaSQL in Python SqlTransform (#11575)
This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9818fa8 [BEAM-9699] Add test verifying we can use ZetaSQL in Python SqlTransform (#11575)
9818fa8 is described below
commit 9818fa823a4ef35840a281503b3402723b5c82fe
Author: Brian Hulette <bh...@google.com>
AuthorDate: Wed May 20 14:59:41 2020 -0700
[BEAM-9699] Add test verifying we can use ZetaSQL in Python SqlTransform (#11575)
* Add zetasql to SQL expansion service
* Add support for ZetaSQL in Python SqlTransform
plumbs through queryPlannerClassName param
* Move dialect -> planner class resolution to Java
Requires moving ExternalTransform classes to
extensions/sql/expansion-service so we can reference ZetaSQL
* add expansion-service package-info (checkstyle)
---
.../extensions/sql/expansion-service/build.gradle | 1 +
.../expansion/ExternalSqlTransformRegistrar.java | 82 ++++++++++++++++++++++
.../sdk/extensions/sql/expansion/package-info.java | 20 ++++++
.../beam/sdk/extensions/sql/SqlTransform.java | 39 +---------
sdks/python/apache_beam/transforms/sql.py | 8 ++-
sdks/python/apache_beam/transforms/sql_test.py | 10 +++
6 files changed, 119 insertions(+), 41 deletions(-)
diff --git a/sdks/java/extensions/sql/expansion-service/build.gradle b/sdks/java/extensions/sql/expansion-service/build.gradle
index 19a3270..998d3ca 100644
--- a/sdks/java/extensions/sql/expansion-service/build.gradle
+++ b/sdks/java/extensions/sql/expansion-service/build.gradle
@@ -30,6 +30,7 @@ ext.summary = """Contains code to run a SQL Expansion Service."""
dependencies {
compile project(path: ":sdks:java:extensions:sql")
+ compile project(path: ":sdks:java:extensions:sql:zetasql")
compile project(path: ":sdks:java:expansion-service")
}
diff --git a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/ExternalSqlTransformRegistrar.java b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/ExternalSqlTransformRegistrar.java
new file mode 100644
index 0000000..cec0a86
--- /dev/null
+++ b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/ExternalSqlTransformRegistrar.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.expansion;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
+import org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner;
+import org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner;
+import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+@AutoService(ExternalTransformRegistrar.class)
+public class ExternalSqlTransformRegistrar implements ExternalTransformRegistrar {
+ private static final String URN = "beam:external:java:sql:v1";
+ private static final ImmutableMap<String, Class<? extends QueryPlanner>> DIALECTS =
+ ImmutableMap.<String, Class<? extends QueryPlanner>>builder()
+ .put("zetasql", ZetaSQLQueryPlanner.class)
+ .put("calcite", CalciteQueryPlanner.class)
+ .build();
+
+ @Override
+ public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
+ return org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap.of(
+ URN, Builder.class);
+ }
+
+ public static class Configuration {
+ String query;
+ String dialect;
+
+ public void setQuery(String query) {
+ this.query = query;
+ }
+
+ public void setDialect(@Nullable String dialect) {
+ this.dialect = dialect;
+ }
+ }
+
+ private static class Builder
+ implements ExternalTransformBuilder<Configuration, PInput, PCollection<Row>> {
+ @Override
+ public PTransform<PInput, PCollection<Row>> buildExternal(Configuration configuration) {
+ SqlTransform transform = SqlTransform.query(configuration.query);
+ if (configuration.dialect != null) {
+ Class<? extends QueryPlanner> queryPlanner =
+ DIALECTS.get(configuration.dialect.toLowerCase());
+ if (queryPlanner == null) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Received unknown SQL Dialect '%s'. Known dialects: %s",
+ configuration.dialect, DIALECTS.keySet()));
+ }
+ transform = transform.withQueryPlannerClass(queryPlanner);
+ }
+ return transform;
+ }
+ }
+}
diff --git a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/package-info.java b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/package-info.java
new file mode 100644
index 0000000..1dcf3fc
--- /dev/null
+++ b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** External Transform Registration for Beam SQL. */
+package org.apache.beam.sdk.extensions.sql.expansion;
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
index 496dd6d..12ce337 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.extensions.sql;
-import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.util.Collections;
import java.util.HashMap;
@@ -25,7 +24,6 @@ import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.BeamSqlEnvBuilder;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
@@ -37,7 +35,6 @@ import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
@@ -273,8 +270,7 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>>
}
@AutoValue.Builder
- abstract static class Builder
- implements ExternalTransformBuilder<External.Configuration, PInput, PCollection<Row>> {
+ abstract static class Builder {
abstract Builder setQueryString(String queryString);
abstract Builder setQueryParameters(QueryParameters queryParameters);
@@ -292,19 +288,6 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>>
abstract Builder setQueryPlannerClassName(@Nullable String queryPlannerClassName);
abstract SqlTransform build();
-
- @Override
- public PTransform<PInput, PCollection<Row>> buildExternal(
- External.Configuration configuration) {
- return builder()
- .setQueryString(configuration.query)
- .setQueryParameters(QueryParameters.ofNone())
- .setUdafDefinitions(Collections.emptyList())
- .setUdfDefinitions(Collections.emptyList())
- .setTableProviderMap(Collections.emptyMap())
- .setAutoUdfUdafLoad(false)
- .build();
- }
}
@AutoValue
@@ -330,24 +313,4 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>>
return new AutoValue_SqlTransform_UdafDefinition(udafName, combineFn);
}
}
-
- @AutoService(ExternalTransformRegistrar.class)
- public static class External implements ExternalTransformRegistrar {
-
- private static final String URN = "beam:external:java:sql:v1";
-
- @Override
- public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
- return org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap.of(
- URN, AutoValue_SqlTransform.Builder.class);
- }
-
- public static class Configuration {
- String query;
-
- public void setQuery(String query) {
- this.query = query;
- }
- }
- }
}
diff --git a/sdks/python/apache_beam/transforms/sql.py b/sdks/python/apache_beam/transforms/sql.py
index 8642376..ce409d2 100644
--- a/sdks/python/apache_beam/transforms/sql.py
+++ b/sdks/python/apache_beam/transforms/sql.py
@@ -32,7 +32,8 @@ from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
__all__ = ['SqlTransform']
SqlTransformSchema = typing.NamedTuple(
- 'SqlTransformSchema', [('query', unicode)])
+ 'SqlTransformSchema', [('query', unicode),
+ ('dialect', typing.Optional[unicode])])
class SqlTransform(ExternalTransform):
@@ -66,9 +67,10 @@ class SqlTransform(ExternalTransform):
"""
URN = 'beam:external:java:sql:v1'
- def __init__(self, query):
+ def __init__(self, query, dialect=None):
super(SqlTransform, self).__init__(
self.URN,
- NamedTupleBasedPayloadBuilder(SqlTransformSchema(query=query)),
+ NamedTupleBasedPayloadBuilder(
+ SqlTransformSchema(query=query, dialect=dialect)),
BeamJarExpansionService(
':sdks:java:extensions:sql:expansion-service:shadowJar'))
diff --git a/sdks/python/apache_beam/transforms/sql_test.py b/sdks/python/apache_beam/transforms/sql_test.py
index b809714..d2271bb 100644
--- a/sdks/python/apache_beam/transforms/sql_test.py
+++ b/sdks/python/apache_beam/transforms/sql_test.py
@@ -138,6 +138,16 @@ class SqlTransformTest(unittest.TestCase):
ON simple.`int` = enrich.`int`"""))
assert_that(out, equal_to([(1, "a"), (26, "z"), (1, "a")]))
+ def test_zetasql_generate_data(self):
+ with TestPipeline() as p:
+ out = p | SqlTransform(
+ """SELECT
+ CAST(1 AS INT64) AS `int`,
+ CAST('foo' AS STRING) AS `str`,
+ CAST(3.14 AS FLOAT64) AS `flt`""",
+ dialect="zetasql")
+ assert_that(out, equal_to([(1, "foo", 3.14)]))
+
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)