You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2020/05/20 21:59:57 UTC

[beam] branch master updated: [BEAM-9699] Add test verifying we can use ZetaSQL in Python SqlTransform (#11575)

This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9818fa8  [BEAM-9699] Add test verifying we can use ZetaSQL in Python SqlTransform (#11575)
9818fa8 is described below

commit 9818fa823a4ef35840a281503b3402723b5c82fe
Author: Brian Hulette <bh...@google.com>
AuthorDate: Wed May 20 14:59:41 2020 -0700

    [BEAM-9699] Add test verifying we can use ZetaSQL in Python SqlTransform (#11575)
    
    * Add zetasql to SQL expansion service
    
    * Add support for ZetaSQL in Python SqlTransform
    
    plumbs through queryPlannerClassName param
    
    * Move dialect -> planner class resolution to Java
    
    Requires moving ExternalTransform classes to
    extensions/sql/expansion-service so we can reference ZetaSQL
    
    * add expansion-service package-info (checkstyle)
---
 .../extensions/sql/expansion-service/build.gradle  |  1 +
 .../expansion/ExternalSqlTransformRegistrar.java   | 82 ++++++++++++++++++++++
 .../sdk/extensions/sql/expansion/package-info.java | 20 ++++++
 .../beam/sdk/extensions/sql/SqlTransform.java      | 39 +---------
 sdks/python/apache_beam/transforms/sql.py          |  8 ++-
 sdks/python/apache_beam/transforms/sql_test.py     | 10 +++
 6 files changed, 119 insertions(+), 41 deletions(-)

diff --git a/sdks/java/extensions/sql/expansion-service/build.gradle b/sdks/java/extensions/sql/expansion-service/build.gradle
index 19a3270..998d3ca 100644
--- a/sdks/java/extensions/sql/expansion-service/build.gradle
+++ b/sdks/java/extensions/sql/expansion-service/build.gradle
@@ -30,6 +30,7 @@ ext.summary = """Contains code to run a SQL Expansion Service."""
 
 dependencies {
   compile project(path: ":sdks:java:extensions:sql")
+  compile project(path: ":sdks:java:extensions:sql:zetasql")
   compile project(path: ":sdks:java:expansion-service")
 }
 
diff --git a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/ExternalSqlTransformRegistrar.java b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/ExternalSqlTransformRegistrar.java
new file mode 100644
index 0000000..cec0a86
--- /dev/null
+++ b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/ExternalSqlTransformRegistrar.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.expansion;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
+import org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner;
+import org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner;
+import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+@AutoService(ExternalTransformRegistrar.class)
+public class ExternalSqlTransformRegistrar implements ExternalTransformRegistrar {
+  private static final String URN = "beam:external:java:sql:v1";
+  private static final ImmutableMap<String, Class<? extends QueryPlanner>> DIALECTS =
+      ImmutableMap.<String, Class<? extends QueryPlanner>>builder()
+          .put("zetasql", ZetaSQLQueryPlanner.class)
+          .put("calcite", CalciteQueryPlanner.class)
+          .build();
+
+  @Override
+  public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
+    return org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap.of(
+        URN, Builder.class);
+  }
+
+  public static class Configuration {
+    String query;
+    String dialect;
+
+    public void setQuery(String query) {
+      this.query = query;
+    }
+
+    public void setDialect(@Nullable String dialect) {
+      this.dialect = dialect;
+    }
+  }
+
+  private static class Builder
+      implements ExternalTransformBuilder<Configuration, PInput, PCollection<Row>> {
+    @Override
+    public PTransform<PInput, PCollection<Row>> buildExternal(Configuration configuration) {
+      SqlTransform transform = SqlTransform.query(configuration.query);
+      if (configuration.dialect != null) {
+        Class<? extends QueryPlanner> queryPlanner =
+            DIALECTS.get(configuration.dialect.toLowerCase());
+        if (queryPlanner == null) {
+          throw new IllegalArgumentException(
+              String.format(
+                  "Received unknown SQL Dialect '%s'. Known dialects: %s",
+                  configuration.dialect, DIALECTS.keySet()));
+        }
+        transform = transform.withQueryPlannerClass(queryPlanner);
+      }
+      return transform;
+    }
+  }
+}
diff --git a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/package-info.java b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/package-info.java
new file mode 100644
index 0000000..1dcf3fc
--- /dev/null
+++ b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** External Transform Registration for Beam SQL. */
+package org.apache.beam.sdk.extensions.sql.expansion;
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
index 496dd6d..12ce337 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.extensions.sql;
 
-import com.google.auto.service.AutoService;
 import com.google.auto.value.AutoValue;
 import java.util.Collections;
 import java.util.HashMap;
@@ -25,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.BeamSqlEnvBuilder;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
@@ -37,7 +35,6 @@ import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.PCollection;
@@ -273,8 +270,7 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>>
   }
 
   @AutoValue.Builder
-  abstract static class Builder
-      implements ExternalTransformBuilder<External.Configuration, PInput, PCollection<Row>> {
+  abstract static class Builder {
     abstract Builder setQueryString(String queryString);
 
     abstract Builder setQueryParameters(QueryParameters queryParameters);
@@ -292,19 +288,6 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>>
     abstract Builder setQueryPlannerClassName(@Nullable String queryPlannerClassName);
 
     abstract SqlTransform build();
-
-    @Override
-    public PTransform<PInput, PCollection<Row>> buildExternal(
-        External.Configuration configuration) {
-      return builder()
-          .setQueryString(configuration.query)
-          .setQueryParameters(QueryParameters.ofNone())
-          .setUdafDefinitions(Collections.emptyList())
-          .setUdfDefinitions(Collections.emptyList())
-          .setTableProviderMap(Collections.emptyMap())
-          .setAutoUdfUdafLoad(false)
-          .build();
-    }
   }
 
   @AutoValue
@@ -330,24 +313,4 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>>
       return new AutoValue_SqlTransform_UdafDefinition(udafName, combineFn);
     }
   }
-
-  @AutoService(ExternalTransformRegistrar.class)
-  public static class External implements ExternalTransformRegistrar {
-
-    private static final String URN = "beam:external:java:sql:v1";
-
-    @Override
-    public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {
-      return org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap.of(
-          URN, AutoValue_SqlTransform.Builder.class);
-    }
-
-    public static class Configuration {
-      String query;
-
-      public void setQuery(String query) {
-        this.query = query;
-      }
-    }
-  }
 }
diff --git a/sdks/python/apache_beam/transforms/sql.py b/sdks/python/apache_beam/transforms/sql.py
index 8642376..ce409d2 100644
--- a/sdks/python/apache_beam/transforms/sql.py
+++ b/sdks/python/apache_beam/transforms/sql.py
@@ -32,7 +32,8 @@ from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
 __all__ = ['SqlTransform']
 
 SqlTransformSchema = typing.NamedTuple(
-    'SqlTransformSchema', [('query', unicode)])
+    'SqlTransformSchema', [('query', unicode),
+                           ('dialect', typing.Optional[unicode])])
 
 
 class SqlTransform(ExternalTransform):
@@ -66,9 +67,10 @@ class SqlTransform(ExternalTransform):
   """
   URN = 'beam:external:java:sql:v1'
 
-  def __init__(self, query):
+  def __init__(self, query, dialect=None):
     super(SqlTransform, self).__init__(
         self.URN,
-        NamedTupleBasedPayloadBuilder(SqlTransformSchema(query=query)),
+        NamedTupleBasedPayloadBuilder(
+            SqlTransformSchema(query=query, dialect=dialect)),
         BeamJarExpansionService(
             ':sdks:java:extensions:sql:expansion-service:shadowJar'))
diff --git a/sdks/python/apache_beam/transforms/sql_test.py b/sdks/python/apache_beam/transforms/sql_test.py
index b809714..d2271bb 100644
--- a/sdks/python/apache_beam/transforms/sql_test.py
+++ b/sdks/python/apache_beam/transforms/sql_test.py
@@ -138,6 +138,16 @@ class SqlTransformTest(unittest.TestCase):
               ON simple.`int` = enrich.`int`"""))
       assert_that(out, equal_to([(1, "a"), (26, "z"), (1, "a")]))
 
+  def test_zetasql_generate_data(self):
+    with TestPipeline() as p:
+      out = p | SqlTransform(
+          """SELECT
+            CAST(1 AS INT64) AS `int`,
+            CAST('foo' AS STRING) AS `str`,
+            CAST(3.14  AS FLOAT64) AS `flt`""",
+          dialect="zetasql")
+      assert_that(out, equal_to([(1, "foo", 3.14)]))
+
 
 if __name__ == "__main__":
   logging.getLogger().setLevel(logging.INFO)