You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2023/01/06 07:06:19 UTC

[GitHub] [beam] byronellis opened a new pull request, #24918: JDBC SchemaTransform implementation. Need to break out into a separat…

byronellis opened a new pull request, #24918:
URL: https://github.com/apache/beam/pull/24918

   SchemaTransform implementation for JdbcIO. Follows the pattern of various other SchemaTransforms, splitting Read and Write into separate transforms and using a URN pattern consistent with other transforms such as Kafka. Configurations are largely intended to match the existing SchemaIO implementation (which appears that it might have issues with usernames and passwords, but would handle that in a separate PR)
   
   Fixes #21408
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #24918: JDBC SchemaTransform implementation. Need to break out into a separat…

Posted by GitBox <gi...@apache.org>.
Abacn commented on PR #24918:
URL: https://github.com/apache/beam/pull/24918#issuecomment-1377712849

   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #24918: JDBC SchemaTransform implementation. Need to break out into a separat…

Posted by GitBox <gi...@apache.org>.
Abacn commented on PR #24918:
URL: https://github.com/apache/beam/pull/24918#issuecomment-1377944677

   Thanks @byronellis @ahmedabu98 !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] byronellis commented on a diff in pull request #24918: JDBC SchemaTransform implementation. Need to break out into a separat…

Posted by GitBox <gi...@apache.org>.
byronellis commented on code in PR #24918:
URL: https://github.com/apache/beam/pull/24918#discussion_r1063949633


##########
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+@AutoService(SchemaTransformProvider.class)
+public class JdbcReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<
+        JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration> {
+
+  @Override
+  protected @UnknownKeyFor @NonNull @Initialized Class<JdbcReadSchemaTransformConfiguration>
+      configurationClass() {
+    return JdbcReadSchemaTransformConfiguration.class;
+  }
+
+  @Override
+  protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+      JdbcReadSchemaTransformConfiguration configuration) {
+    if (configuration.getReadQuery() != null && configuration.getLocation() != null) {
+      throw new IllegalArgumentException(
+          "ReadQuery and Location are mutually exclusive configurations");
+    }
+    if (configuration.getReadQuery() == null && configuration.getLocation() == null) {
+      throw new IllegalArgumentException("Either ReadQuery or Location must be set.");
+    }
+    return new JdbcReadSchemaTransform(configuration);
+  }
+
+  static class JdbcReadSchemaTransform implements SchemaTransform, Serializable {
+
+    JdbcReadSchemaTransformConfiguration config;
+
+    public JdbcReadSchemaTransform(JdbcReadSchemaTransformConfiguration config) {
+      this.config = config;
+    }
+
+    protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() {
+      JdbcIO.DataSourceConfiguration dsConfig =
+          JdbcIO.DataSourceConfiguration.create(config.getDriverClassName(), config.getJdbcUrl())
+              .withUsername(config.getUsername())
+              .withPassword(config.getPassword());
+      String connectionProperties = config.getConnectionProperties();
+      if (connectionProperties != null) {
+        dsConfig = dsConfig.withConnectionProperties(connectionProperties);
+      }
+
+      List<@org.checkerframework.checker.nullness.qual.Nullable String> initialSql =
+          config.getConnectionInitSql();
+      if (initialSql != null && initialSql.size() > 0) {
+        dsConfig = dsConfig.withConnectionInitSqls(initialSql);
+      }
+
+      return dsConfig;
+    }
+
+    @Override
+    public @UnknownKeyFor @NonNull @Initialized PTransform<
+            @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple,
+            @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple>
+        buildTransform() {
+      return new PTransform<PCollectionRowTuple, PCollectionRowTuple>() {
+        @Override
+        public PCollectionRowTuple expand(PCollectionRowTuple input) {
+          String query = config.getReadQuery();
+          if (query == null) {
+            query = String.format("SELECT * FROM %s", config.getLocation());
+          }
+          JdbcIO.ReadRows readRows =
+              JdbcIO.readRows()
+                  .withDataSourceConfiguration(dataSourceConfiguration())
+                  .withQuery(query);
+          Short fetchSize = config.getFetchSize();
+          if (fetchSize != null) {

Review Comment:
   Yup, I also modified username and password to account for this behavior as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn merged pull request #24918: JDBC SchemaTransform implementation. Need to break out into a separat…

Posted by GitBox <gi...@apache.org>.
Abacn merged PR #24918:
URL: https://github.com/apache/beam/pull/24918


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24918: JDBC SchemaTransform implementation. Need to break out into a separat…

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24918:
URL: https://github.com/apache/beam/pull/24918#issuecomment-1373266824

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @kennknowles for label java.
   R: @Abacn for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24918: JDBC SchemaTransform implementation. Need to break out into a separat…

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24918:
URL: https://github.com/apache/beam/pull/24918#issuecomment-1373885849

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #24918: JDBC SchemaTransform implementation. Need to break out into a separat…

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on code in PR #24918:
URL: https://github.com/apache/beam/pull/24918#discussion_r1065000676


##########
sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java:
##########
@@ -65,28 +65,68 @@ public static void beforeClass() throws Exception {
   }
 
   @Test
-  public void testInvalidOptions() {
-    JdbcReadSchemaTransformProvider provider = new JdbcReadSchemaTransformProvider();
+  public void testInvalidReadSchemaOptions() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+              .setDriverClassName("")
+              .setJdbcUrl("")
+              .build()
+              .validate();
+        });
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+              .setDriverClassName("ClassName")
+              .setJdbcUrl("JdbcUrl")
+              .setLocation("Location")
+              .setReadQuery("Query")
+              .build()
+              .validate();
+        });
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+              .setDriverClassName("ClassName")
+              .setJdbcUrl("JdbcUrl")
+              .build()
+              .validate();
+        });
+  }
 
+  @Test
+  public void testInvalidWriteSchemaOptions() {

Review Comment:
   Should this test be moved to `JdbcWriteSchemaTransformProviderTest`?



##########
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java:
##########
@@ -164,6 +158,26 @@ public abstract static class JdbcReadSchemaTransformConfiguration implements Ser
     @Nullable
     public abstract Boolean getOutputParallelization();
 
+    public void validate() throws IllegalArgumentException {
+      if ("".equals(getDriverClassName())) {
+        throw new IllegalArgumentException("JDBC Driver class name cannot be blank.");
+      }
+      if ("".equals(getJdbcUrl())) {
+        throw new IllegalArgumentException("JDBC URL cannot be blank");
+      }

Review Comment:
   Let's check for null values too (here and in write provider)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] byronellis commented on a diff in pull request #24918: JDBC SchemaTransform implementation. Need to break out into a separat…

Posted by GitBox <gi...@apache.org>.
byronellis commented on code in PR #24918:
URL: https://github.com/apache/beam/pull/24918#discussion_r1063949699


##########
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+@AutoService(SchemaTransformProvider.class)
+public class JdbcReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<
+        JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration> {
+
+  @Override
+  protected @UnknownKeyFor @NonNull @Initialized Class<JdbcReadSchemaTransformConfiguration>
+      configurationClass() {
+    return JdbcReadSchemaTransformConfiguration.class;
+  }
+
+  @Override
+  protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+      JdbcReadSchemaTransformConfiguration configuration) {
+    if (configuration.getReadQuery() != null && configuration.getLocation() != null) {

Review Comment:
   Started getting a little overloaded so I moved stuff to a validate method and applied changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #24918: JDBC SchemaTransform implementation. Need to break out into a separat…

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on code in PR #24918:
URL: https://github.com/apache/beam/pull/24918#discussion_r1063854607


##########
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+@AutoService(SchemaTransformProvider.class)
+public class JdbcReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<
+        JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration> {
+
+  @Override
+  protected @UnknownKeyFor @NonNull @Initialized Class<JdbcReadSchemaTransformConfiguration>
+      configurationClass() {
+    return JdbcReadSchemaTransformConfiguration.class;
+  }
+
+  @Override
+  protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+      JdbcReadSchemaTransformConfiguration configuration) {
+    if (configuration.getReadQuery() != null && configuration.getLocation() != null) {

Review Comment:
   ```suggestion
       if (!Strings.isNullOrEmpty(configuration.getReadQuery()) && !Strings.isNullOrEmpty(configuration.getLocation())) {
   ```
   
   Empty strings may be sent from a remote SDK in place of null values. For example, in Python SDK it is currently not supported to send `None` values so no configuration field is actually ever null. For strings `""` is sent instead.
   
   Should apply similar checks to relevant configuration fields in this and JdbcWrite provider



##########
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+@AutoService(SchemaTransformProvider.class)
+public class JdbcReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<
+        JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration> {
+
+  @Override
+  protected @UnknownKeyFor @NonNull @Initialized Class<JdbcReadSchemaTransformConfiguration>
+      configurationClass() {
+    return JdbcReadSchemaTransformConfiguration.class;
+  }
+
+  @Override
+  protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+      JdbcReadSchemaTransformConfiguration configuration) {
+    if (configuration.getReadQuery() != null && configuration.getLocation() != null) {
+      throw new IllegalArgumentException(
+          "ReadQuery and Location are mutually exclusive configurations");
+    }
+    if (configuration.getReadQuery() == null && configuration.getLocation() == null) {
+      throw new IllegalArgumentException("Either ReadQuery or Location must be set.");
+    }
+    return new JdbcReadSchemaTransform(configuration);
+  }
+
+  static class JdbcReadSchemaTransform implements SchemaTransform, Serializable {
+
+    JdbcReadSchemaTransformConfiguration config;
+
+    public JdbcReadSchemaTransform(JdbcReadSchemaTransformConfiguration config) {
+      this.config = config;
+    }
+
+    protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() {
+      JdbcIO.DataSourceConfiguration dsConfig =
+          JdbcIO.DataSourceConfiguration.create(config.getDriverClassName(), config.getJdbcUrl())

Review Comment:
   Maybe `getDriverClassName()` and `getJdbcUrl()` should have some handling too and throw an early error if they're empty/null. 



##########
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+@AutoService(SchemaTransformProvider.class)
+public class JdbcReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<
+        JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration> {
+
+  @Override
+  protected @UnknownKeyFor @NonNull @Initialized Class<JdbcReadSchemaTransformConfiguration>
+      configurationClass() {
+    return JdbcReadSchemaTransformConfiguration.class;
+  }
+
+  @Override
+  protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+      JdbcReadSchemaTransformConfiguration configuration) {
+    if (configuration.getReadQuery() != null && configuration.getLocation() != null) {
+      throw new IllegalArgumentException(
+          "ReadQuery and Location are mutually exclusive configurations");
+    }
+    if (configuration.getReadQuery() == null && configuration.getLocation() == null) {
+      throw new IllegalArgumentException("Either ReadQuery or Location must be set.");
+    }
+    return new JdbcReadSchemaTransform(configuration);
+  }
+
+  static class JdbcReadSchemaTransform implements SchemaTransform, Serializable {
+
+    JdbcReadSchemaTransformConfiguration config;
+
+    public JdbcReadSchemaTransform(JdbcReadSchemaTransformConfiguration config) {
+      this.config = config;
+    }
+
+    protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() {
+      JdbcIO.DataSourceConfiguration dsConfig =
+          JdbcIO.DataSourceConfiguration.create(config.getDriverClassName(), config.getJdbcUrl())
+              .withUsername(config.getUsername())
+              .withPassword(config.getPassword());
+      String connectionProperties = config.getConnectionProperties();
+      if (connectionProperties != null) {
+        dsConfig = dsConfig.withConnectionProperties(connectionProperties);
+      }
+
+      List<@org.checkerframework.checker.nullness.qual.Nullable String> initialSql =
+          config.getConnectionInitSql();
+      if (initialSql != null && initialSql.size() > 0) {
+        dsConfig = dsConfig.withConnectionInitSqls(initialSql);
+      }
+
+      return dsConfig;
+    }
+
+    @Override
+    public @UnknownKeyFor @NonNull @Initialized PTransform<
+            @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple,
+            @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple>
+        buildTransform() {
+      return new PTransform<PCollectionRowTuple, PCollectionRowTuple>() {
+        @Override
+        public PCollectionRowTuple expand(PCollectionRowTuple input) {
+          String query = config.getReadQuery();
+          if (query == null) {
+            query = String.format("SELECT * FROM %s", config.getLocation());
+          }
+          JdbcIO.ReadRows readRows =
+              JdbcIO.readRows()
+                  .withDataSourceConfiguration(dataSourceConfiguration())
+                  .withQuery(query);
+          Short fetchSize = config.getFetchSize();
+          if (fetchSize != null) {

Review Comment:
   ```suggestion
             if (fetchSize != null && fetchSize > 0) {
   ```
   
   For the same reasons mentioned above, fetchSize could have a default value of 0 in place of null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #24918: JDBC SchemaTransform implementation. Need to break out into a separat…

Posted by GitBox <gi...@apache.org>.
Abacn commented on PR #24918:
URL: https://github.com/apache/beam/pull/24918#issuecomment-1377712271

   Looks like jenkins was down shortly and tests not ran 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] byronellis commented on pull request #24918: JDBC SchemaTransform implementation. Need to break out into a separat…

Posted by GitBox <gi...@apache.org>.
byronellis commented on PR #24918:
URL: https://github.com/apache/beam/pull/24918#issuecomment-1377788192

   Took the opportunity to add a couple of documentation blurbs :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] byronellis commented on a diff in pull request #24918: JDBC SchemaTransform implementation. Need to break out into a separat…

Posted by GitBox <gi...@apache.org>.
byronellis commented on code in PR #24918:
URL: https://github.com/apache/beam/pull/24918#discussion_r1063949610


##########
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+@AutoService(SchemaTransformProvider.class)
+public class JdbcReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<
+        JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration> {
+
+  @Override
+  protected @UnknownKeyFor @NonNull @Initialized Class<JdbcReadSchemaTransformConfiguration>
+      configurationClass() {
+    return JdbcReadSchemaTransformConfiguration.class;
+  }
+
+  @Override
+  protected @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+      JdbcReadSchemaTransformConfiguration configuration) {
+    if (configuration.getReadQuery() != null && configuration.getLocation() != null) {
+      throw new IllegalArgumentException(
+          "ReadQuery and Location are mutually exclusive configurations");
+    }
+    if (configuration.getReadQuery() == null && configuration.getLocation() == null) {
+      throw new IllegalArgumentException("Either ReadQuery or Location must be set.");
+    }
+    return new JdbcReadSchemaTransform(configuration);
+  }
+
+  static class JdbcReadSchemaTransform implements SchemaTransform, Serializable {
+
+    JdbcReadSchemaTransformConfiguration config;
+
+    public JdbcReadSchemaTransform(JdbcReadSchemaTransformConfiguration config) {
+      this.config = config;
+    }
+
+    protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() {
+      JdbcIO.DataSourceConfiguration dsConfig =
+          JdbcIO.DataSourceConfiguration.create(config.getDriverClassName(), config.getJdbcUrl())

Review Comment:
   Updated validation to check for blank strings



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #24918: JDBC SchemaTransform implementation. Need to break out into a separat…

Posted by GitBox <gi...@apache.org>.
Abacn commented on PR #24918:
URL: https://github.com/apache/beam/pull/24918#issuecomment-1373884744

   R: @ahmedabu98 for schema transform PRs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] byronellis commented on a diff in pull request #24918: JDBC SchemaTransform implementation. Need to break out into a separat…

Posted by GitBox <gi...@apache.org>.
byronellis commented on code in PR #24918:
URL: https://github.com/apache/beam/pull/24918#discussion_r1065959831


##########
sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java:
##########
@@ -65,28 +65,68 @@ public static void beforeClass() throws Exception {
   }
 
   @Test
-  public void testInvalidOptions() {
-    JdbcReadSchemaTransformProvider provider = new JdbcReadSchemaTransformProvider();
+  public void testInvalidReadSchemaOptions() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+              .setDriverClassName("")
+              .setJdbcUrl("")
+              .build()
+              .validate();
+        });
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+              .setDriverClassName("ClassName")
+              .setJdbcUrl("JdbcUrl")
+              .setLocation("Location")
+              .setReadQuery("Query")
+              .build()
+              .validate();
+        });
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> {
+          JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+              .setDriverClassName("ClassName")
+              .setJdbcUrl("JdbcUrl")
+              .build()
+              .validate();
+        });
+  }
 
+  @Test
+  public void testInvalidWriteSchemaOptions() {

Review Comment:
   Good catch! Moved.



##########
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java:
##########
@@ -164,6 +158,26 @@ public abstract static class JdbcReadSchemaTransformConfiguration implements Ser
     @Nullable
     public abstract Boolean getOutputParallelization();
 
+    public void validate() throws IllegalArgumentException {
+      if ("".equals(getDriverClassName())) {
+        throw new IllegalArgumentException("JDBC Driver class name cannot be blank.");
+      }
+      if ("".equals(getJdbcUrl())) {
+        throw new IllegalArgumentException("JDBC URL cannot be blank");
+      }

Review Comment:
   Sure thing. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] byronellis commented on pull request #24918: JDBC SchemaTransform implementation. Need to break out into a separat…

Posted by GitBox <gi...@apache.org>.
byronellis commented on PR #24918:
URL: https://github.com/apache/beam/pull/24918#issuecomment-1377932461

   @Abacn All tests are now passing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org