You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2023/02/04 18:45:55 UTC

[beam] branch master updated: Basic SchemaTransform implementation for SQLTransform. (#25177)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e6a3e41aba Basic SchemaTransform implementation for SQLTransform. (#25177)
4e6a3e41aba is described below

commit 4e6a3e41aba2f5b6cfae5469f2a1f530e0b8f9f0
Author: Byron Ellis <by...@gmail.com>
AuthorDate: Sat Feb 4 10:45:42 2023 -0800

    Basic SchemaTransform implementation for SQLTransform. (#25177)
    
    * Make the name of the desired pcollection name public so other people don't have to guess.
    
    * WIP SQLTransformSchemaTransformProvider. Should be able to provide access to most of the SQLTransform parameters, though probably not the UDF implementation or the error transform. That would require substantial refactoring/surgery I think.
    
    * WIP. Using enumerations to make the configuration schema more discoverable (particularly for tableproviders and query planners which could change over time)
    
    * Basic version of the SQLTransformSchemaTransformProvider. Has basically a complete configuration and I think a (hacky) approach to getting DLQ information out of the transform at least good enough to use for testing. There's no unit test coverage in this module so not clear the existing external transform is tested/working either.
    
    * Remove clutter
    
    * Add @Experimental annotation and always emit an errors PCollection even if there are no calculations.
    
    * Fix up checkstyle violation
---
 .../extensions/sql/expansion-service/build.gradle  |   1 +
 .../SqlTransformSchemaTransformProvider.java       | 232 +++++++++++++++++++++
 .../beam/sdk/extensions/sql/SqlTransform.java      |   3 +-
 3 files changed, 235 insertions(+), 1 deletion(-)

diff --git a/sdks/java/extensions/sql/expansion-service/build.gradle b/sdks/java/extensions/sql/expansion-service/build.gradle
index 261f83e7853..48f31c75128 100644
--- a/sdks/java/extensions/sql/expansion-service/build.gradle
+++ b/sdks/java/extensions/sql/expansion-service/build.gradle
@@ -38,6 +38,7 @@ dependencies {
   implementation project(path: ":sdks:java:extensions:sql")
   implementation project(path: ":sdks:java:extensions:sql:zetasql")
   implementation library.java.vendored_guava_26_0_jre
+
 }
 
 task runExpansionService (type: JavaExec) {
diff --git a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java
new file mode 100644
index 00000000000..0649a0978e4
--- /dev/null
+++ b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.expansion;
+
+import com.google.auto.service.AutoService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
+import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+@Experimental
+@AutoService(SchemaTransformProvider.class)
+public class SqlTransformSchemaTransformProvider implements SchemaTransformProvider {
+
+  private static final Map<String, Class<? extends QueryPlanner>> QUERY_PLANNERS =
+      ImmutableMap.of(
+          "zetasql", org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.class,
+          "calcite", org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.class);
+  private static final EnumerationType QUERY_ENUMERATION =
+      EnumerationType.create(QUERY_PLANNERS.keySet().stream().collect(Collectors.toList()));
+
+  private static final OneOfType QUERY_PARAMETER =
+      OneOfType.create(
+          Schema.Field.nullable("string", Schema.FieldType.STRING),
+          Schema.Field.nullable("short", Schema.FieldType.INT16),
+          Schema.Field.nullable("int", Schema.FieldType.INT32),
+          Schema.Field.nullable("long", Schema.FieldType.INT64),
+          Schema.Field.nullable("float", Schema.FieldType.FLOAT),
+          Schema.Field.nullable("double", Schema.FieldType.DOUBLE),
+          Schema.Field.nullable("datetime", Schema.FieldType.DATETIME));
+
+  @Override
+  public String identifier() {
+    return "schematransform:org.apache.beam:sql_transform:v1";
+  }
+
+  @Override
+  public Schema configurationSchema() {
+    List<String> providers = new ArrayList<>();
+    ServiceLoader.load(TableProvider.class)
+        .forEach(
+            (provider) -> {
+              providers.add(provider.getTableType());
+            });
+    EnumerationType providerEnum = EnumerationType.create(providers);
+
+    return Schema.of(
+        Schema.Field.of("query", Schema.FieldType.STRING),
+        Schema.Field.nullable(
+            "ddl", Schema.FieldType.STRING), // TODO: Underlying builder seems more capable?
+        Schema.Field.nullable("dialect", Schema.FieldType.logicalType(QUERY_ENUMERATION)),
+        Schema.Field.nullable("autoload", Schema.FieldType.BOOLEAN),
+        Schema.Field.nullable(
+            "tableproviders", Schema.FieldType.array(Schema.FieldType.logicalType(providerEnum))),
+        Schema.Field.nullable(
+            "parameters",
+            Schema.FieldType.logicalType(
+                OneOfType.create(
+                    Schema.Field.of(
+                        "positional",
+                        Schema.FieldType.array(Schema.FieldType.logicalType(QUERY_PARAMETER))),
+                    Schema.Field.of(
+                        "named",
+                        Schema.FieldType.array(Schema.FieldType.logicalType(QUERY_PARAMETER)))))));
+  }
+
+  @Override
+  public SchemaTransform from(Row configuration) {
+    return new SqlSchemaTransform(configuration);
+  }
+
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public List<String> outputCollectionNames() {
+    return ImmutableList.of("output", "errors");
+  }
+
+  static class ErrorCapture extends PTransform<PCollection<Row>, PDone> {
+
+    transient List<PCollection<Row>> inputs = new ArrayList<>();
+
+    public List<PCollection<Row>> getInputs() {
+      return inputs;
+    }
+
+    @Override
+    public PDone expand(PCollection<Row> input) {
+      inputs.add(input);
+      return PDone.in(input.getPipeline());
+    }
+  }
+
+  static class SqlSchemaTransform implements SchemaTransform {
+    final Row config;
+
+    public SqlSchemaTransform(Row config) {
+      this.config = config;
+    }
+
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return new PTransform<PCollectionRowTuple, PCollectionRowTuple>() {
+        @Override
+        public PCollectionRowTuple expand(PCollectionRowTuple input) {
+
+          // Start with the query. In theory the exception can't be thrown, but all this nullness
+          // stuff
+          // isn't actually smart enough to know that. Could just cop and suppress that warning, but
+          // doing it the hard way for some reason.
+          String queryString = config.getString("query");
+          if (queryString == null) {
+            throw new IllegalArgumentException("Configuration must provide a query string.");
+          }
+          SqlTransform transform = SqlTransform.query(queryString);
+
+          // Allow setting the query planner class via the dialect name.
+          EnumerationType.Value dialect =
+              config.getLogicalTypeValue("dialect", EnumerationType.Value.class);
+          if (dialect != null) {
+            Class<? extends QueryPlanner> queryPlannerClass =
+                QUERY_PLANNERS.get(QUERY_ENUMERATION.toString(dialect));
+            if (queryPlannerClass != null) {
+              transform = transform.withQueryPlannerClass(queryPlannerClass);
+            }
+          }
+
+          // Add any DDL strings
+          String ddl = config.getString("ddl");
+          if (ddl != null) {
+            transform = transform.withDdlString(ddl);
+          }
+
+          // Check to see if we autoload or not
+          Boolean autoload = config.getBoolean("autoload");
+          if (autoload != null && autoload) {
+            transform = transform.withAutoLoading(true);
+          } else {
+            transform = transform.withAutoLoading(false);
+
+            // Add any user specified table providers from the set of available tableproviders.
+            Map<String, TableProvider> tableProviders = new HashMap<>();
+            ServiceLoader.load(TableProvider.class)
+                .forEach(
+                    (provider) -> {
+                      tableProviders.put(provider.getTableType(), provider);
+                    });
+            Collection<?> tableproviderList = config.getArray("tableproviders");
+            if (tableproviderList != null) {
+              for (Object nameObj : tableproviderList) {
+                if (nameObj != null) { // This actually could in theory be null...
+                  TableProvider p = tableProviders.get(nameObj);
+                  if (p
+                      != null) { // TODO: We ignore tableproviders that don't exist, we could change
+                    // that.
+                    transform = transform.withTableProvider(p.getTableType(), p);
+                  }
+                }
+              }
+            }
+          }
+
+          // TODO: Process query parameters. This is not necessary for Syndeo GA but would be
+          // really nice to have.
+
+          // TODO: See about reimplementing a correct version of SqlTransform
+          ErrorCapture errors = new ErrorCapture();
+          PCollection<Row> output = input.apply(transform.withErrorsTransformer(errors));
+
+          // TODO: One possibility for capturing the required tables would be to inject a
+          // tableprovider
+          // that we control and see which tables are requested during expansion. We could then
+          // modify the output schema to reflect these inputs via options for better validation.
+
+          List<PCollection<Row>> errorList = errors.getInputs();
+          if (errorList.size() == 0) {
+            PCollection<Row> emptyErrors =
+                input
+                    .getPipeline()
+                    .apply(Create.empty(BeamSqlRelUtils.getErrorRowSchema(Schema.of())));
+            return PCollectionRowTuple.of("output", output, "errors", emptyErrors);
+          } else if (errorList.size() == 1) {
+            return PCollectionRowTuple.of("output", output, "errors", errorList.get(0));
+          } else {
+            throw new UnsupportedOperationException(
+                "SqlTransform currently only supports a single dead letter queue collection");
+          }
+        }
+      };
+    }
+  }
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
index e1f7f51129a..c7f1b38cc1b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
@@ -110,7 +110,8 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 @AutoValue
 @AutoValue.CopyAnnotations
 public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>> {
-  static final String PCOLLECTION_NAME = "PCOLLECTION";
+
+  public static final String PCOLLECTION_NAME = "PCOLLECTION";
 
   abstract String queryString();