You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/07 14:21:59 UTC

[GitHub] [beam] sclukas77 opened a new pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

sclukas77 opened a new pull request #12498:
URL: https://github.com/apache/beam/pull/12498


   Implemented ExternalSchemaIOTransformRegistrar that uses ServiceLoader to find implementations of SchemaIOProviders so that the ExternalTransformBuilder can defer to the buildReader(..) and buildWriter(..) function of the providers' SchemaIO subclasses. Implemented JdbcSchemaIOProvider to test cross language transforms using the new implementation.
   
   R:@TheNeuralBit
   R:@robinyqiu
   ------------------------------------------------------------------------------------------------
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/b
 eam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   ![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12498:
URL: https://github.com/apache/beam/pull/12498#discussion_r473186613



##########
File path: sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
##########
@@ -144,11 +144,11 @@ def test_xlang_jdbc_read(self):
       result = (
           p
           | 'Read from jdbc' >> ReadFromJdbc(
+              table_name=table_name,

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12498:
URL: https://github.com/apache/beam/pull/12498#issuecomment-677886094


   Run Portable_Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12498:
URL: https://github.com/apache/beam/pull/12498#discussion_r467339563



##########
File path: sdks/python/apache_beam/io/jdbc.py
##########
@@ -168,41 +182,33 @@ def __init__(
     :param expansion_service: The address (host:port) of the ExpansionService.
     """
 
-    super(WriteToJdbc, self).__init__(
-        self.URN,
-        NamedTupleBasedPayloadBuilder(
-            WriteToJdbcSchema(
-                driver_class_name=driver_class_name,
-                jdbc_url=jdbc_url,
-                username=username,
-                password=password,
-                statement=statement,
-                connection_properties=connection_properties,
-                connection_init_sqls=connection_init_sqls,
+        super(WriteToJdbc, self).__init__(
+            self.URN,
+            NamedTupleBasedPayloadBuilder(
+                ReadFromWriteToJdbcSchema(
+                    location=jdbc_url,

Review comment:
       Also for the shared configuration object we should disambiguate query and statement, maybe as read_query and write_statement. The parameters for ReadFromJdbc and WriteToJdbc could remain "query" and "statement" though since there it's clear from context.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12498:
URL: https://github.com/apache/beam/pull/12498#discussion_r467112288



##########
File path: settings.gradle
##########
@@ -181,3 +181,4 @@ include "beam-test-tools"
 project(":beam-test-tools").dir = file(".test-infra/tools")
 include "beam-test-jenkins"
 project(":beam-test-jenkins").dir = file(".test-infra/jenkins")
+include 'sdks:java:extensions:schemaio-expansion-service'

Review comment:
       nit: can you move this up with the other :sdks:java entries and match the formatting

##########
File path: sdks/java/extensions/schemaio-expansion-service/build.gradle
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+plugins {
+    id 'org.apache.beam.module'
+}
+
+applyJavaNature(
+        enableChecker:false,

Review comment:
       Can you remove this? We've disabled checker framework in most projects because we just added it and we're still cleaning up all the bugs it's finding, but for a new project we should turn checker framework on and fix any issues it finds. 

##########
File path: sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/package-info.java
##########
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** External Transform Registration for SchemaIOs. */
+package org.apache.beam.sdk.extensions.schemaio.expansion;

Review comment:
       ```suggestion
   @DefaultQualifier(NonNull.class)
   package org.apache.beam.sdk.extensions.schemaio.expansion;
   
   import org.checkerframework.framework.qual.DefaultQualifier;
   import org.checkerframework.checker.nullness.qual.NonNull;
   ```
   
   We should make everything default to NonNull. After you do this you'll likely get a lot of build errors from checker framework that will require you to add `@Nullable` on anything that can be null.
   
   (See https://checkerframework.org/manual/#default-qualifier)

##########
File path: sdks/python/apache_beam/io/jdbc.py
##########
@@ -168,41 +182,33 @@ def __init__(
     :param expansion_service: The address (host:port) of the ExpansionService.
     """
 
-    super(WriteToJdbc, self).__init__(
-        self.URN,
-        NamedTupleBasedPayloadBuilder(
-            WriteToJdbcSchema(
-                driver_class_name=driver_class_name,
-                jdbc_url=jdbc_url,
-                username=username,
-                password=password,
-                statement=statement,
-                connection_properties=connection_properties,
-                connection_init_sqls=connection_init_sqls,
+        super(WriteToJdbc, self).__init__(
+            self.URN,
+            NamedTupleBasedPayloadBuilder(
+                ReadFromWriteToJdbcSchema(
+                    location=jdbc_url,

Review comment:
       I think the better analog for "location" would actually be the table name, and the JDBC URL would be part of the configuration. Unfortunately it looks like the typical use-case (i.e. what we have in xlang_jdbcio_it_test) just has the table name implicitly in the query: https://github.com/apache/beam/blob/84cbd9bdfbe45221b8f3302cedad7bdb47cd2f5a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py#L151
   
   and in the statement:
   https://github.com/apache/beam/blob/84cbd9bdfbe45221b8f3302cedad7bdb47cd2f5a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py#L120
   
   Maybe what we should do here is require a `table_name` parameter that we will use for the location. Then, in the JdbcSchemaIOProvider, we can actually generate the query and statement. The user would still have the ability to override both of these if they prefer. WDYT?

##########
File path: sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.schemaio.expansion;
+
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.ServiceLoader;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.SchemaApi;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+@Experimental(Experimental.Kind.PORTABILITY)
+@AutoService(ExternalTransformRegistrar.class)
+public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegistrar {
+
+  @Override
+  public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
+    ImmutableMap.Builder builder = ImmutableMap.<String, ExternalTransformRegistrar>builder();
+    try {
+      for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
+        builder.put(
+            "beam:external:java:" + schemaIOProvider.identifier() + ":read:v1",
+            new ReaderBuilder(schemaIOProvider));
+        builder.put(
+            "beam:external:java:" + schemaIOProvider.identifier() + ":write:v1",
+            new WriterBuilder(schemaIOProvider));
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e.getMessage());
+    }
+    return builder.build();
+  }
+
+  public static class Configuration {
+    String location = "";
+    byte[] config = new byte[0];
+    @Nullable byte[] dataSchema = null;
+
+    public void setLocation(String location) {
+      this.location = location;
+    }
+
+    public void setConfig(byte[] config) {
+      this.config = config;
+    }
+
+    public void setDataSchema(byte[] dataSchema) {
+      this.dataSchema = dataSchema;
+    }
+  }
+
+  @Nullable
+  private static Schema translateSchema(@Nullable byte[] schemaBytes) throws Exception {
+    if (schemaBytes == null) {
+      return null;
+    }
+    SchemaApi.Schema protoSchema = SchemaApi.Schema.parseFrom(schemaBytes);
+    return SchemaTranslation.schemaFromProto(protoSchema);
+  }
+
+  private static Row translateRow(byte[] rowBytes, Schema configSchema) throws Exception {
+    RowCoder rowCoder = RowCoder.of(configSchema);
+    InputStream stream = new ByteArrayInputStream(rowBytes);
+    return rowCoder.decode(stream);
+  }
+
+  private static class ReaderBuilder
+      implements ExternalTransformBuilder<Configuration, PBegin, PCollection<Row>> {
+    SchemaIOProvider schemaIOProvider;
+
+    ReaderBuilder(SchemaIOProvider schemaIOProvider) {
+      this.schemaIOProvider = schemaIOProvider;
+    }
+
+    @Override
+    public PTransform<PBegin, PCollection<Row>> buildExternal(Configuration configuration) {
+      try {
+        return schemaIOProvider
+            .from(
+                configuration.location,
+                translateRow(configuration.config, schemaIOProvider.configurationSchema()),
+                translateSchema(configuration.dataSchema))
+            .buildReader();
+      } catch (Exception e) {
+        throw new RuntimeException("Could not convert configuration proto to row or schema.");
+      }
+    }
+  }
+
+  private static class WriterBuilder
+      implements ExternalTransformBuilder<Configuration, PCollection<Row>, PDone> {
+    SchemaIOProvider schemaIOProvider;
+
+    WriterBuilder(SchemaIOProvider schemaIOProvider) {
+      this.schemaIOProvider = schemaIOProvider;
+    }
+
+    @Override
+    public PTransform<PCollection<Row>, PDone> buildExternal(Configuration configuration) {
+      try {
+        return (PTransform<PCollection<Row>, PDone>)
+            schemaIOProvider
+                .from(
+                    configuration.location,
+                    translateRow(configuration.config, schemaIOProvider.configurationSchema()),
+                    translateSchema(configuration.dataSchema))
+                .buildWriter();
+      } catch (Exception e) {
+        throw new RuntimeException("Could not convert configuration proto to row or schema.");

Review comment:
       This isn't ideal since it will swallow up any other exception that occurs inside the try block, and when it does catch something from translateRow or translateSchema it's not clear where the problem is. Could you instead add a try-catch in translateRow and translateSchema that catches the specific exception(s) that might be thrown, and re-throws them with a RuntimeException?
   
   When you re-throw you should always reference the original exception as well, like `throw new RuntimeException("Some additional context", e)`

##########
File path: sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.schemaio.expansion;
+
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.ServiceLoader;
+import org.apache.beam.model.pipeline.v1.SchemaApi;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+@Experimental(Experimental.Kind.PORTABILITY)
+@AutoService(ExternalTransformRegistrar.class)
+public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegistrar {
+  private static final String URN = "beam:external:java:schemaio:v1";

Review comment:
       I don't think this URN is used.

##########
File path: sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.schemaio.expansion;
+
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.ServiceLoader;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.SchemaApi;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+@Experimental(Experimental.Kind.PORTABILITY)
+@AutoService(ExternalTransformRegistrar.class)
+public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegistrar {
+
+  @Override
+  public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
+    ImmutableMap.Builder builder = ImmutableMap.<String, ExternalTransformRegistrar>builder();
+    try {
+      for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
+        builder.put(
+            "beam:external:java:" + schemaIOProvider.identifier() + ":read:v1",
+            new ReaderBuilder(schemaIOProvider));
+        builder.put(
+            "beam:external:java:" + schemaIOProvider.identifier() + ":write:v1",

Review comment:
       Let's make these URNs mention schemaio to distinguish them in case there are non-schemaio versions. Maybe `beam:external:java:schemaio:<identifier>`

##########
File path: sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.schemaio.expansion;
+
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.ServiceLoader;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.SchemaApi;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+@Experimental(Experimental.Kind.PORTABILITY)
+@AutoService(ExternalTransformRegistrar.class)
+public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegistrar {
+
+  @Override
+  public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
+    ImmutableMap.Builder builder = ImmutableMap.<String, ExternalTransformRegistrar>builder();
+    try {
+      for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
+        builder.put(
+            "beam:external:java:" + schemaIOProvider.identifier() + ":read:v1",
+            new ReaderBuilder(schemaIOProvider));
+        builder.put(
+            "beam:external:java:" + schemaIOProvider.identifier() + ":write:v1",
+            new WriterBuilder(schemaIOProvider));
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e.getMessage());
+    }
+    return builder.build();
+  }
+
+  public static class Configuration {
+    String location = "";
+    byte[] config = new byte[0];
+    @Nullable byte[] dataSchema = null;
+
+    public void setLocation(String location) {
+      this.location = location;
+    }
+
+    public void setConfig(byte[] config) {
+      this.config = config;
+    }
+
+    public void setDataSchema(byte[] dataSchema) {
+      this.dataSchema = dataSchema;
+    }
+  }
+
+  @Nullable
+  private static Schema translateSchema(@Nullable byte[] schemaBytes) throws Exception {
+    if (schemaBytes == null) {
+      return null;
+    }
+    SchemaApi.Schema protoSchema = SchemaApi.Schema.parseFrom(schemaBytes);
+    return SchemaTranslation.schemaFromProto(protoSchema);
+  }
+
+  private static Row translateRow(byte[] rowBytes, Schema configSchema) throws Exception {
+    RowCoder rowCoder = RowCoder.of(configSchema);
+    InputStream stream = new ByteArrayInputStream(rowBytes);
+    return rowCoder.decode(stream);
+  }
+
+  private static class ReaderBuilder
+      implements ExternalTransformBuilder<Configuration, PBegin, PCollection<Row>> {
+    SchemaIOProvider schemaIOProvider;
+
+    ReaderBuilder(SchemaIOProvider schemaIOProvider) {
+      this.schemaIOProvider = schemaIOProvider;
+    }
+
+    @Override
+    public PTransform<PBegin, PCollection<Row>> buildExternal(Configuration configuration) {
+      try {
+        return schemaIOProvider
+            .from(
+                configuration.location,
+                translateRow(configuration.config, schemaIOProvider.configurationSchema()),
+                translateSchema(configuration.dataSchema))
+            .buildReader();
+      } catch (Exception e) {
+        throw new RuntimeException("Could not convert configuration proto to row or schema.");
+      }
+    }
+  }
+
+  private static class WriterBuilder

Review comment:
       There should probably be some unit tests that exercise WriterBuilder and ReaderBuilder. You could create instances that reference a mock SchemaIOProvider and verify they do what we expect.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12498:
URL: https://github.com/apache/beam/pull/12498#issuecomment-678557966


   :tada: thank you @sclukas77!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12498:
URL: https://github.com/apache/beam/pull/12498#discussion_r468719581



##########
File path: settings.gradle
##########
@@ -181,3 +181,4 @@ include "beam-test-tools"
 project(":beam-test-tools").dir = file(".test-infra/tools")
 include "beam-test-jenkins"
 project(":beam-test-jenkins").dir = file(".test-infra/jenkins")
+include 'sdks:java:extensions:schemaio-expansion-service'

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12498:
URL: https://github.com/apache/beam/pull/12498#issuecomment-678461477


   Run Python 3.8 PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12498:
URL: https://github.com/apache/beam/pull/12498#discussion_r468722620



##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link SchemaIOProvider} for reading and writing JSON payloads with {@link
+ * JdbcIO}.
+ */
+@Internal
+@AutoService(SchemaIOProvider.class)
+public class JdbcSchemaIOProvider implements SchemaIOProvider {
+
+  /** Returns an id that uniquely represents this IO. */
+  @Override
+  public String identifier() {
+    return "jdbc";
+  }
+
+  /**
+   * Returns the expected schema of the configuration object. Note this is distinct from the schema
+   * of the data source itself.
+   */
+  @Override
+  public Schema configurationSchema() {
+    return Schema.builder()
+        .addStringField("driverClassName")
+        .addStringField("jdbcUrl")
+        .addStringField("username")
+        .addStringField("password")
+        .addNullableField("connectionProperties", FieldType.STRING)
+        .addNullableField("connectionInitSqls", FieldType.iterable(FieldType.STRING))
+        .addNullableField("readQuery", FieldType.STRING)
+        .addNullableField("writeStatement", FieldType.STRING)
+        .addNullableField("fetchSize", FieldType.INT16)
+        .addNullableField("outputParallelization", FieldType.BOOLEAN)
+        .build();
+  }
+
+  /**
+   * Produce a SchemaIO given a String representing the data's location, the schema of the data that
+   * resides there, and some IO-specific configuration object.
+   */
+  @Override
+  public JdbcSchemaIO from(String location, Row configuration, Schema dataSchema) {
+    return new JdbcSchemaIO(location, configuration);
+  }
+
+  @Override
+  public boolean requiresDataSchema() {
+    return false;
+  }
+
+  @Override
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.BOUNDED;
+  }
+
+  /** An abstraction to create schema aware IOs. */
+  static class JdbcSchemaIO implements SchemaIO, Serializable {
+    protected final Row config;
+    protected final String location;
+
+    JdbcSchemaIO(String location, Row config) {
+      this.config = config;
+      this.location = location;
+    }
+
+    @Override
+    public Schema schema() {
+      return null;
+    }
+
+    @Override
+    public PTransform<PBegin, PCollection<Row>> buildReader() {
+      String readQuery;
+      if (config.getString("readQuery") != null) {
+        readQuery = config.getString("readQuery");
+      } else {
+        readQuery = String.format("SELECT f_int FROM %s", location);

Review comment:
       Similarly this shouldn't mention specific field names. In this case I think we could just do `SELECT * FROM %s`

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link SchemaIOProvider} for reading and writing JSON payloads with {@link
+ * JdbcIO}.
+ */
+@Internal
+@AutoService(SchemaIOProvider.class)
+public class JdbcSchemaIOProvider implements SchemaIOProvider {
+
+  /** Returns an id that uniquely represents this IO. */
+  @Override
+  public String identifier() {
+    return "jdbc";
+  }
+
+  /**
+   * Returns the expected schema of the configuration object. Note this is distinct from the schema
+   * of the data source itself.
+   */
+  @Override
+  public Schema configurationSchema() {
+    return Schema.builder()
+        .addStringField("driverClassName")
+        .addStringField("jdbcUrl")
+        .addStringField("username")
+        .addStringField("password")
+        .addNullableField("connectionProperties", FieldType.STRING)
+        .addNullableField("connectionInitSqls", FieldType.iterable(FieldType.STRING))
+        .addNullableField("readQuery", FieldType.STRING)
+        .addNullableField("writeStatement", FieldType.STRING)
+        .addNullableField("fetchSize", FieldType.INT16)
+        .addNullableField("outputParallelization", FieldType.BOOLEAN)
+        .build();
+  }
+
+  /**
+   * Produce a SchemaIO given a String representing the data's location, the schema of the data that
+   * resides there, and some IO-specific configuration object.
+   */
+  @Override
+  public JdbcSchemaIO from(String location, Row configuration, Schema dataSchema) {
+    return new JdbcSchemaIO(location, configuration);
+  }
+
+  @Override
+  public boolean requiresDataSchema() {
+    return false;
+  }
+
+  @Override
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.BOUNDED;
+  }
+
+  /** An abstraction to create schema aware IOs. */
+  static class JdbcSchemaIO implements SchemaIO, Serializable {
+    protected final Row config;
+    protected final String location;
+
+    JdbcSchemaIO(String location, Row config) {
+      this.config = config;
+      this.location = location;
+    }
+
+    @Override
+    public Schema schema() {
+      return null;
+    }
+
+    @Override
+    public PTransform<PBegin, PCollection<Row>> buildReader() {
+      String readQuery;
+      if (config.getString("readQuery") != null) {
+        readQuery = config.getString("readQuery");
+      } else {
+        readQuery = String.format("SELECT f_int FROM %s", location);
+      }
+
+      JdbcIO.DataSourceConfiguration dataSourceConfiguration = getDataSourceConfiguration();
+
+      JdbcIO.ReadRows readRows =
+          JdbcIO.readRows()
+              .withDataSourceConfiguration(dataSourceConfiguration)
+              .withQuery(readQuery);
+
+      if (config.getInt16("fetchSize") != null) {
+        readRows = readRows.withFetchSize(config.getInt16("fetchSize"));
+      }
+      if (config.getBoolean("outputParallelization") != null) {
+        readRows = readRows.withOutputParallelization(config.getBoolean("outputParallelization"));
+      }
+      return readRows;
+    }
+
+    @Override
+    public PTransform<PCollection<Row>, PDone> buildWriter() {
+      String writeStatement;
+      if (config.getString("writeStatement") != null) {
+        writeStatement = config.getString("writeStatement");
+      } else {
+        writeStatement = String.format("INSERT INTO %s VALUES(?, ?, ?)", location);

Review comment:
       The number of ?s in this query should depend on the number of fields in the schema of the input PCollection. `(?, ?, ?)` will only work when writing exactly three fields. I'd suggest returning a PTransform that looks at the input PCollection in its expand method. Something like this:
   
   ```java
   return new PTransform<PCollection<Row>, PDone>() {
     public PDone expand(PCollection<Row> input) {
       Schema schema = input.getSchema()
       // Generate writeStatement based on input schema
       
       return input.apply(JdbcIO.<Row>write()...);
     }
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12498:
URL: https://github.com/apache/beam/pull/12498#discussion_r468718319



##########
File path: sdks/python/apache_beam/io/jdbc.py
##########
@@ -168,41 +182,33 @@ def __init__(
     :param expansion_service: The address (host:port) of the ExpansionService.
     """
 
-    super(WriteToJdbc, self).__init__(
-        self.URN,
-        NamedTupleBasedPayloadBuilder(
-            WriteToJdbcSchema(
-                driver_class_name=driver_class_name,
-                jdbc_url=jdbc_url,
-                username=username,
-                password=password,
-                statement=statement,
-                connection_properties=connection_properties,
-                connection_init_sqls=connection_init_sqls,
+        super(WriteToJdbc, self).__init__(
+            self.URN,
+            NamedTupleBasedPayloadBuilder(
+                ReadFromWriteToJdbcSchema(
+                    location=jdbc_url,

Review comment:
       That makes sense to me. I added in the query/statement generator code and modified the existing test to use the default generated query/statement, but I think we should add in overriding query/statement tests too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12498:
URL: https://github.com/apache/beam/pull/12498#discussion_r468718529



##########
File path: sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/package-info.java
##########
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** External Transform Registration for SchemaIOs. */
+package org.apache.beam.sdk.extensions.schemaio.expansion;

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12498:
URL: https://github.com/apache/beam/pull/12498#discussion_r473186190



##########
File path: sdks/python/apache_beam/io/jdbc.py
##########
@@ -96,25 +98,37 @@
 
 
 def default_io_expansion_service():
-  return BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar')
+    return BeamJarExpansionService(
+        ':sdks:java:extensions:schemaio-expansion-service:shadowJar')
 
 
-WriteToJdbcSchema = typing.NamedTuple(
-    'WriteToJdbcSchema',
+ReadFromWriteToJdbcSchema = typing.NamedTuple(
+    'ReadFromWriteToJdbcSchema',
+    [
+        ('location', unicode),
+        ('config', bytes)
+    ],
+)
+
+Config = typing.NamedTuple(
+    'Config',
     [
         ('driver_class_name', unicode),
         ('jdbc_url', unicode),
         ('username', unicode),
         ('password', unicode),
         ('connection_properties', typing.Optional[unicode]),
         ('connection_init_sqls', typing.Optional[typing.List[unicode]]),
-        ('statement', unicode),
+        ('write_statement', typing.Optional[unicode]),
+        ('read_query', typing.Optional[unicode]),
+        ('fetch_size', typing.Optional[int]),
+        ('output_parallelization', typing.Optional[bool]),
     ],
 )
 
 
 class WriteToJdbc(ExternalTransform):
-  """A PTransform which writes Rows to the specified database via JDBC.
+    """A PTransform which writes Rows to the specified database via JDBC.

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12498:
URL: https://github.com/apache/beam/pull/12498#issuecomment-678539599


   Run Python 3.8 PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12498:
URL: https://github.com/apache/beam/pull/12498#discussion_r473182309



##########
File path: sdks/java/extensions/schemaio-expansion-service/build.gradle
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+plugins {
+    id 'org.apache.beam.module'
+}
+
+applyJavaNature(
+        ignoreRawtypeErrors: true,
+        automaticModuleName: 'org.apache.beam.sdk.extensions.schemaio.expansion',
+        validateShadowJar: false,
+        shadowClosure: {
+            manifest {
+                attributes "Main-Class": "org.apache.beam.sdk.expansion.service.ExpansionService"
+            }
+        }
+)
+
+dependencies {
+    compile project(path: ":sdks:java:expansion-service")
+    compile project(":sdks:java:io:jdbc")
+    compile library.java.postgres
+    testCompile library.java.junit
+    testCompile library.java.powermock_mockito
+    testCompile library.java.mockito_core
+    testCompile "org.checkerframework:checker-qual:3.5.0"
+}
+
+task runExpansionService (type: JavaExec) {
+    main = "org.apache.beam.sdk.expansion.service.ExpansionService"
+    classpath = sourceSets.main.runtimeClasspath
+    args = [project.findProperty(":constructionService.port") ?: "8097"]
+}

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12498:
URL: https://github.com/apache/beam/pull/12498#discussion_r468719290



##########
File path: sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.schemaio.expansion;
+
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.ServiceLoader;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.SchemaApi;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+@Experimental(Experimental.Kind.PORTABILITY)
+@AutoService(ExternalTransformRegistrar.class)
+public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegistrar {
+
+  @Override
+  public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
+    ImmutableMap.Builder builder = ImmutableMap.<String, ExternalTransformRegistrar>builder();
+    try {
+      for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
+        builder.put(
+            "beam:external:java:" + schemaIOProvider.identifier() + ":read:v1",
+            new ReaderBuilder(schemaIOProvider));
+        builder.put(
+            "beam:external:java:" + schemaIOProvider.identifier() + ":write:v1",

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12498:
URL: https://github.com/apache/beam/pull/12498#discussion_r470865279



##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link SchemaIOProvider} for reading and writing JSON payloads with {@link
+ * JdbcIO}.
+ */
+@Internal
+@AutoService(SchemaIOProvider.class)
+public class JdbcSchemaIOProvider implements SchemaIOProvider {
+
+  /** Returns an id that uniquely represents this IO. */
+  @Override
+  public String identifier() {
+    return "jdbc";
+  }
+
+  /**
+   * Returns the expected schema of the configuration object. Note this is distinct from the schema
+   * of the data source itself.
+   */
+  @Override
+  public Schema configurationSchema() {
+    return Schema.builder()
+        .addStringField("driverClassName")
+        .addStringField("jdbcUrl")
+        .addStringField("username")
+        .addStringField("password")
+        .addNullableField("connectionProperties", FieldType.STRING)
+        .addNullableField("connectionInitSqls", FieldType.iterable(FieldType.STRING))
+        .addNullableField("readQuery", FieldType.STRING)
+        .addNullableField("writeStatement", FieldType.STRING)
+        .addNullableField("fetchSize", FieldType.INT16)
+        .addNullableField("outputParallelization", FieldType.BOOLEAN)
+        .build();
+  }
+
+  /**
+   * Produce a SchemaIO given a String representing the data's location, the schema of the data that
+   * resides there, and some IO-specific configuration object.
+   */
+  @Override
+  public JdbcSchemaIO from(String location, Row configuration, Schema dataSchema) {
+    return new JdbcSchemaIO(location, configuration);
+  }
+
+  @Override
+  public boolean requiresDataSchema() {
+    return false;
+  }
+
+  @Override
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.BOUNDED;
+  }
+
+  /** An abstraction to create schema aware IOs. */
+  static class JdbcSchemaIO implements SchemaIO, Serializable {
+    protected final Row config;
+    protected final String location;
+
+    JdbcSchemaIO(String location, Row config) {
+      this.config = config;
+      this.location = location;
+    }
+
+    @Override
+    public Schema schema() {
+      return null;
+    }
+
+    @Override
+    public PTransform<PBegin, PCollection<Row>> buildReader() {
+      String readQuery;
+      if (config.getString("readQuery") != null) {
+        readQuery = config.getString("readQuery");
+      } else {
+        readQuery = String.format("SELECT f_int FROM %s", location);
+      }
+
+      JdbcIO.DataSourceConfiguration dataSourceConfiguration = getDataSourceConfiguration();
+
+      JdbcIO.ReadRows readRows =
+          JdbcIO.readRows()
+              .withDataSourceConfiguration(dataSourceConfiguration)
+              .withQuery(readQuery);
+
+      if (config.getInt16("fetchSize") != null) {
+        readRows = readRows.withFetchSize(config.getInt16("fetchSize"));
+      }
+      if (config.getBoolean("outputParallelization") != null) {
+        readRows = readRows.withOutputParallelization(config.getBoolean("outputParallelization"));
+      }
+      return readRows;
+    }
+
+    @Override
+    public PTransform<PCollection<Row>, PDone> buildWriter() {
+      String writeStatement;
+      if (config.getString("writeStatement") != null) {
+        writeStatement = config.getString("writeStatement");
+      } else {
+        writeStatement = String.format("INSERT INTO %s VALUES(?, ?, ?)", location);

Review comment:
       Done.

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link SchemaIOProvider} for reading and writing JSON payloads with {@link
+ * JdbcIO}.
+ */
+@Internal
+@AutoService(SchemaIOProvider.class)
+public class JdbcSchemaIOProvider implements SchemaIOProvider {
+
+  /** Returns an id that uniquely represents this IO. */
+  @Override
+  public String identifier() {
+    return "jdbc";
+  }
+
+  /**
+   * Returns the expected schema of the configuration object. Note this is distinct from the schema
+   * of the data source itself.
+   */
+  @Override
+  public Schema configurationSchema() {
+    return Schema.builder()
+        .addStringField("driverClassName")
+        .addStringField("jdbcUrl")
+        .addStringField("username")
+        .addStringField("password")
+        .addNullableField("connectionProperties", FieldType.STRING)
+        .addNullableField("connectionInitSqls", FieldType.iterable(FieldType.STRING))
+        .addNullableField("readQuery", FieldType.STRING)
+        .addNullableField("writeStatement", FieldType.STRING)
+        .addNullableField("fetchSize", FieldType.INT16)
+        .addNullableField("outputParallelization", FieldType.BOOLEAN)
+        .build();
+  }
+
+  /**
+   * Produce a SchemaIO given a String representing the data's location, the schema of the data that
+   * resides there, and some IO-specific configuration object.
+   */
+  @Override
+  public JdbcSchemaIO from(String location, Row configuration, Schema dataSchema) {
+    return new JdbcSchemaIO(location, configuration);
+  }
+
+  @Override
+  public boolean requiresDataSchema() {
+    return false;
+  }
+
+  @Override
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.BOUNDED;
+  }
+
+  /** An abstraction to create schema aware IOs. */
+  static class JdbcSchemaIO implements SchemaIO, Serializable {
+    protected final Row config;
+    protected final String location;
+
+    JdbcSchemaIO(String location, Row config) {
+      this.config = config;
+      this.location = location;
+    }
+
+    @Override
+    public Schema schema() {
+      return null;
+    }
+
+    @Override
+    public PTransform<PBegin, PCollection<Row>> buildReader() {
+      String readQuery;
+      if (config.getString("readQuery") != null) {
+        readQuery = config.getString("readQuery");
+      } else {
+        readQuery = String.format("SELECT f_int FROM %s", location);

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12498:
URL: https://github.com/apache/beam/pull/12498#discussion_r467335905



##########
File path: sdks/python/apache_beam/io/jdbc.py
##########
@@ -168,41 +182,33 @@ def __init__(
     :param expansion_service: The address (host:port) of the ExpansionService.
     """
 
-    super(WriteToJdbc, self).__init__(
-        self.URN,
-        NamedTupleBasedPayloadBuilder(
-            WriteToJdbcSchema(
-                driver_class_name=driver_class_name,
-                jdbc_url=jdbc_url,
-                username=username,
-                password=password,
-                statement=statement,
-                connection_properties=connection_properties,
-                connection_init_sqls=connection_init_sqls,
+        super(WriteToJdbc, self).__init__(
+            self.URN,
+            NamedTupleBasedPayloadBuilder(
+                ReadFromWriteToJdbcSchema(
+                    location=jdbc_url,

Review comment:
       I think the better analog for "location" would actually be the table name, and the JDBC URL would be part of the configuration. Unfortunately it looks like the typical use-case (i.e. what we have in xlang_jdbcio_it_test) just has the table name implicitly in the query: https://github.com/apache/beam/blob/84cbd9bdfbe45221b8f3302cedad7bdb47cd2f5a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py#L151
   
   and in the statement:
   https://github.com/apache/beam/blob/84cbd9bdfbe45221b8f3302cedad7bdb47cd2f5a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py#L120
   
   Maybe what we should do here is require a `table_name` parameter that we will use for the location. Then, in the JdbcSchemaIOProvider, we can actually generate the query and statement. The user would still have the ability to override both query and statement if they prefer. WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12498:
URL: https://github.com/apache/beam/pull/12498#issuecomment-677914468


   Run Python 3.8 PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12498:
URL: https://github.com/apache/beam/pull/12498#discussion_r473182547



##########
File path: sdks/java/extensions/schemaio-expansion-service/build.gradle
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+plugins {
+    id 'org.apache.beam.module'
+}
+
+applyJavaNature(
+        ignoreRawtypeErrors: true,
+        automaticModuleName: 'org.apache.beam.sdk.extensions.schemaio.expansion',
+        validateShadowJar: false,
+        shadowClosure: {
+            manifest {
+                attributes "Main-Class": "org.apache.beam.sdk.expansion.service.ExpansionService"
+            }
+        }
+)
+
+dependencies {
+    compile project(path: ":sdks:java:expansion-service")
+    compile project(":sdks:java:io:jdbc")
+    compile library.java.postgres
+    testCompile library.java.junit
+    testCompile library.java.powermock_mockito
+    testCompile library.java.mockito_core
+    testCompile "org.checkerframework:checker-qual:3.5.0"

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12498:
URL: https://github.com/apache/beam/pull/12498#discussion_r468718975



##########
File path: sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.schemaio.expansion;
+
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.ServiceLoader;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.SchemaApi;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+@Experimental(Experimental.Kind.PORTABILITY)
+@AutoService(ExternalTransformRegistrar.class)
+public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegistrar {
+
+  @Override
+  public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
+    ImmutableMap.Builder builder = ImmutableMap.<String, ExternalTransformRegistrar>builder();
+    try {
+      for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
+        builder.put(
+            "beam:external:java:" + schemaIOProvider.identifier() + ":read:v1",
+            new ReaderBuilder(schemaIOProvider));
+        builder.put(
+            "beam:external:java:" + schemaIOProvider.identifier() + ":write:v1",
+            new WriterBuilder(schemaIOProvider));
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e.getMessage());
+    }
+    return builder.build();
+  }
+
+  public static class Configuration {
+    String location = "";
+    byte[] config = new byte[0];
+    @Nullable byte[] dataSchema = null;
+
+    public void setLocation(String location) {
+      this.location = location;
+    }
+
+    public void setConfig(byte[] config) {
+      this.config = config;
+    }
+
+    public void setDataSchema(byte[] dataSchema) {
+      this.dataSchema = dataSchema;
+    }
+  }
+
+  @Nullable
+  private static Schema translateSchema(@Nullable byte[] schemaBytes) throws Exception {
+    if (schemaBytes == null) {
+      return null;
+    }
+    SchemaApi.Schema protoSchema = SchemaApi.Schema.parseFrom(schemaBytes);
+    return SchemaTranslation.schemaFromProto(protoSchema);
+  }
+
+  private static Row translateRow(byte[] rowBytes, Schema configSchema) throws Exception {
+    RowCoder rowCoder = RowCoder.of(configSchema);
+    InputStream stream = new ByteArrayInputStream(rowBytes);
+    return rowCoder.decode(stream);
+  }
+
+  private static class ReaderBuilder
+      implements ExternalTransformBuilder<Configuration, PBegin, PCollection<Row>> {
+    SchemaIOProvider schemaIOProvider;
+
+    ReaderBuilder(SchemaIOProvider schemaIOProvider) {
+      this.schemaIOProvider = schemaIOProvider;
+    }
+
+    @Override
+    public PTransform<PBegin, PCollection<Row>> buildExternal(Configuration configuration) {
+      try {
+        return schemaIOProvider
+            .from(
+                configuration.location,
+                translateRow(configuration.config, schemaIOProvider.configurationSchema()),
+                translateSchema(configuration.dataSchema))
+            .buildReader();
+      } catch (Exception e) {
+        throw new RuntimeException("Could not convert configuration proto to row or schema.");
+      }
+    }
+  }
+
+  private static class WriterBuilder
+      implements ExternalTransformBuilder<Configuration, PCollection<Row>, PDone> {
+    SchemaIOProvider schemaIOProvider;
+
+    WriterBuilder(SchemaIOProvider schemaIOProvider) {
+      this.schemaIOProvider = schemaIOProvider;
+    }
+
+    @Override
+    public PTransform<PCollection<Row>, PDone> buildExternal(Configuration configuration) {
+      try {
+        return (PTransform<PCollection<Row>, PDone>)
+            schemaIOProvider
+                .from(
+                    configuration.location,
+                    translateRow(configuration.config, schemaIOProvider.configurationSchema()),
+                    translateSchema(configuration.dataSchema))
+                .buildWriter();
+      } catch (Exception e) {
+        throw new RuntimeException("Could not convert configuration proto to row or schema.");

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12498:
URL: https://github.com/apache/beam/pull/12498#discussion_r468720134



##########
File path: sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.schemaio.expansion;
+
+import com.google.auto.service.AutoService;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.ServiceLoader;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.SchemaApi;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+@Experimental(Experimental.Kind.PORTABILITY)
+@AutoService(ExternalTransformRegistrar.class)
+public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegistrar {
+
+  @Override
+  public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
+    ImmutableMap.Builder builder = ImmutableMap.<String, ExternalTransformRegistrar>builder();
+    try {
+      for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
+        builder.put(
+            "beam:external:java:" + schemaIOProvider.identifier() + ":read:v1",
+            new ReaderBuilder(schemaIOProvider));
+        builder.put(
+            "beam:external:java:" + schemaIOProvider.identifier() + ":write:v1",
+            new WriterBuilder(schemaIOProvider));
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e.getMessage());
+    }
+    return builder.build();
+  }
+
+  public static class Configuration {
+    String location = "";
+    byte[] config = new byte[0];
+    @Nullable byte[] dataSchema = null;
+
+    public void setLocation(String location) {
+      this.location = location;
+    }
+
+    public void setConfig(byte[] config) {
+      this.config = config;
+    }
+
+    public void setDataSchema(byte[] dataSchema) {
+      this.dataSchema = dataSchema;
+    }
+  }
+
+  @Nullable
+  private static Schema translateSchema(@Nullable byte[] schemaBytes) throws Exception {
+    if (schemaBytes == null) {
+      return null;
+    }
+    SchemaApi.Schema protoSchema = SchemaApi.Schema.parseFrom(schemaBytes);
+    return SchemaTranslation.schemaFromProto(protoSchema);
+  }
+
+  private static Row translateRow(byte[] rowBytes, Schema configSchema) throws Exception {
+    RowCoder rowCoder = RowCoder.of(configSchema);
+    InputStream stream = new ByteArrayInputStream(rowBytes);
+    return rowCoder.decode(stream);
+  }
+
+  private static class ReaderBuilder
+      implements ExternalTransformBuilder<Configuration, PBegin, PCollection<Row>> {
+    SchemaIOProvider schemaIOProvider;
+
+    ReaderBuilder(SchemaIOProvider schemaIOProvider) {
+      this.schemaIOProvider = schemaIOProvider;
+    }
+
+    @Override
+    public PTransform<PBegin, PCollection<Row>> buildExternal(Configuration configuration) {
+      try {
+        return schemaIOProvider
+            .from(
+                configuration.location,
+                translateRow(configuration.config, schemaIOProvider.configurationSchema()),
+                translateSchema(configuration.dataSchema))
+            .buildReader();
+      } catch (Exception e) {
+        throw new RuntimeException("Could not convert configuration proto to row or schema.");
+      }
+    }
+  }
+
+  private static class WriterBuilder

Review comment:
       ACK, I will add these in.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12498:
URL: https://github.com/apache/beam/pull/12498#issuecomment-677831174


   Run Spotless PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12498:
URL: https://github.com/apache/beam/pull/12498#issuecomment-670578572






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12498:
URL: https://github.com/apache/beam/pull/12498#issuecomment-670580853


   Run Python 3.8 PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12498:
URL: https://github.com/apache/beam/pull/12498#discussion_r470904963



##########
File path: sdks/java/extensions/schemaio-expansion-service/src/test/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrarTest.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.schemaio.expansion;
+
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.extensions.schemaio.expansion.ExternalSchemaIOTransformRegistrar.Configuration;
+import org.apache.beam.sdk.extensions.schemaio.expansion.ExternalSchemaIOTransformRegistrar.ReaderBuilder;
+import org.apache.beam.sdk.extensions.schemaio.expansion.ExternalSchemaIOTransformRegistrar.WriterBuilder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+/** Unit tests for {@link ExternalSchemaIOTransformRegistrar}. */
+@RunWith(JUnit4.class)
+public class ExternalSchemaIOTransformRegistrarTest {
+  String location = "test";
+  Schema validDataSchema = Schema.builder().addStringField("dataField").build();
+  Schema validConfigSchema = Schema.builder().addStringField("configField").build();
+  Row validConfigRow = Row.withSchema(validConfigSchema).addValue("value").build();
+
+  byte[] validSchemaBytes = SchemaTranslation.schemaToProto(validDataSchema, true).toByteArray();
+  byte[] invalidBytes = "Nice try".getBytes(Charset.defaultCharset());

Review comment:
       Nice :)

##########
File path: sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
##########
@@ -144,11 +144,11 @@ def test_xlang_jdbc_read(self):
       result = (
           p
           | 'Read from jdbc' >> ReadFromJdbc(
+              table_name=table_name,

Review comment:
       Ideally we would keep both sets of tests, one that relies on just the table name, and one that sets the SELECT/INSERT statement. We can leave this for a follow-up though if you file a jira and add a TODO

##########
File path: sdks/python/apache_beam/io/jdbc.py
##########
@@ -96,25 +98,37 @@
 
 
 def default_io_expansion_service():
-  return BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar')
+    return BeamJarExpansionService(
+        ':sdks:java:extensions:schemaio-expansion-service:shadowJar')
 
 
-WriteToJdbcSchema = typing.NamedTuple(
-    'WriteToJdbcSchema',
+ReadFromWriteToJdbcSchema = typing.NamedTuple(

Review comment:
       Filed BEAM-10709 for this

##########
File path: sdks/python/apache_beam/io/jdbc.py
##########
@@ -96,25 +98,37 @@
 
 
 def default_io_expansion_service():
-  return BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar')
+    return BeamJarExpansionService(
+        ':sdks:java:extensions:schemaio-expansion-service:shadowJar')
 
 
-WriteToJdbcSchema = typing.NamedTuple(
-    'WriteToJdbcSchema',
+ReadFromWriteToJdbcSchema = typing.NamedTuple(
+    'ReadFromWriteToJdbcSchema',
+    [
+        ('location', unicode),
+        ('config', bytes)
+    ],
+)
+
+Config = typing.NamedTuple(
+    'Config',
     [
         ('driver_class_name', unicode),
         ('jdbc_url', unicode),
         ('username', unicode),
         ('password', unicode),
         ('connection_properties', typing.Optional[unicode]),
         ('connection_init_sqls', typing.Optional[typing.List[unicode]]),
-        ('statement', unicode),
+        ('write_statement', typing.Optional[unicode]),
+        ('read_query', typing.Optional[unicode]),
+        ('fetch_size', typing.Optional[int]),
+        ('output_parallelization', typing.Optional[bool]),
     ],
 )
 
 
 class WriteToJdbc(ExternalTransform):
-  """A PTransform which writes Rows to the specified database via JDBC.
+    """A PTransform which writes Rows to the specified database via JDBC.

Review comment:
       nit: looks like there's an extraneous whitespace change here.
   
   Please update this docstring (and the Read counterpart) based on our change to table_name/statement/query. It should point out that table_name is required and statement/query are optional, but can be overriden

##########
File path: sdks/python/apache_beam/io/jdbc.py
##########
@@ -96,25 +98,37 @@
 
 
 def default_io_expansion_service():
-  return BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar')
+    return BeamJarExpansionService(
+        ':sdks:java:extensions:schemaio-expansion-service:shadowJar')
 
 
-WriteToJdbcSchema = typing.NamedTuple(
-    'WriteToJdbcSchema',
+ReadFromWriteToJdbcSchema = typing.NamedTuple(

Review comment:
       nit: WDYT about calling this JdbcConfigSchema?
   

##########
File path: sdks/java/extensions/schemaio-expansion-service/build.gradle
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+plugins {
+    id 'org.apache.beam.module'
+}
+
+applyJavaNature(
+        ignoreRawtypeErrors: true,
+        automaticModuleName: 'org.apache.beam.sdk.extensions.schemaio.expansion',
+        validateShadowJar: false,
+        shadowClosure: {
+            manifest {
+                attributes "Main-Class": "org.apache.beam.sdk.expansion.service.ExpansionService"
+            }
+        }
+)
+
+dependencies {
+    compile project(path: ":sdks:java:expansion-service")
+    compile project(":sdks:java:io:jdbc")
+    compile library.java.postgres
+    testCompile library.java.junit
+    testCompile library.java.powermock_mockito
+    testCompile library.java.mockito_core
+    testCompile "org.checkerframework:checker-qual:3.5.0"

Review comment:
       Assuming this is because of BEAM-10632? Let's add a TODO here so we remember to remove it later
   ```suggestion
       testCompile library.java.mockito_core
       // TODO(BEAM-10632): remove this dependency
       testCompile "org.checkerframework:checker-qual:3.5.0"
   ```

##########
File path: sdks/java/extensions/schemaio-expansion-service/build.gradle
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+plugins {
+    id 'org.apache.beam.module'
+}
+
+applyJavaNature(
+        ignoreRawtypeErrors: true,
+        automaticModuleName: 'org.apache.beam.sdk.extensions.schemaio.expansion',
+        validateShadowJar: false,
+        shadowClosure: {
+            manifest {
+                attributes "Main-Class": "org.apache.beam.sdk.expansion.service.ExpansionService"
+            }
+        }
+)
+
+dependencies {
+    compile project(path: ":sdks:java:expansion-service")
+    compile project(":sdks:java:io:jdbc")
+    compile library.java.postgres
+    testCompile library.java.junit
+    testCompile library.java.powermock_mockito
+    testCompile library.java.mockito_core
+    testCompile "org.checkerframework:checker-qual:3.5.0"
+}
+
+task runExpansionService (type: JavaExec) {
+    main = "org.apache.beam.sdk.expansion.service.ExpansionService"
+    classpath = sourceSets.main.runtimeClasspath
+    args = [project.findProperty(":constructionService.port") ?: "8097"]
+}

Review comment:
       I don't think we need this task, Python is responsible for running the expansion service from the compiled jar using subprocess.

##########
File path: sdks/python/apache_beam/io/jdbc.py
##########
@@ -96,25 +98,37 @@
 
 
 def default_io_expansion_service():
-  return BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar')
+    return BeamJarExpansionService(
+        ':sdks:java:extensions:schemaio-expansion-service:shadowJar')
 
 
-WriteToJdbcSchema = typing.NamedTuple(
-    'WriteToJdbcSchema',
+ReadFromWriteToJdbcSchema = typing.NamedTuple(

Review comment:
       I think the next improvement to SchemaIO/Provider should be to formally differentiate read-only and write-only configuration, it's confusing to combine them this way when it's not needed. The only place its _needed_ is in SQL, so we should do it there.
   (you don't need to do anything here, I'm just getting some thoughts out)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12498:
URL: https://github.com/apache/beam/pull/12498#issuecomment-670717056


   Run Python 3.8 PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sclukas77 commented on a change in pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
sclukas77 commented on a change in pull request #12498:
URL: https://github.com/apache/beam/pull/12498#discussion_r473184583



##########
File path: sdks/python/apache_beam/io/jdbc.py
##########
@@ -96,25 +98,37 @@
 
 
 def default_io_expansion_service():
-  return BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar')
+    return BeamJarExpansionService(
+        ':sdks:java:extensions:schemaio-expansion-service:shadowJar')
 
 
-WriteToJdbcSchema = typing.NamedTuple(
-    'WriteToJdbcSchema',
+ReadFromWriteToJdbcSchema = typing.NamedTuple(

Review comment:
       I agree it is a more concise way of calling it. I updated the name here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12498:
URL: https://github.com/apache/beam/pull/12498#issuecomment-670582948


   FYI: @chamikaramj, @piotr-szuberski - this PR ports over the JDBC external transform to an implementation of SchemaIOProvider, and it adds general purpose code for using SchemaIOProvider implementations cross-language. It's re-using the xlang_jdbcio_it tests Piotr added in Python to verify it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit merged pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
TheNeuralBit merged pull request #12498:
URL: https://github.com/apache/beam/pull/12498


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12498:
URL: https://github.com/apache/beam/pull/12498#issuecomment-670580751


   I bet there will be an issue with with the python postcommit when it runs the JDBC IO tests, since it depends on a different expansion service now. You'll likely need to change this line to depend on the schemaio expansion service: https://github.com/apache/beam/pull/12145/files#diff-bc30545c02a5426a5d15209ee354475eR225


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12498: [BEAM-10654] Implemented ExternalSchemaIOTransformRegistrar for jdbc

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12498:
URL: https://github.com/apache/beam/pull/12498#issuecomment-677911586


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org