You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/12/01 00:32:52 UTC

[GitHub] [beam] chamikaramj commented on a change in pull request #15863: [BEAM-13184] Autosharding for JdbcIO.write* transforms

chamikaramj commented on a change in pull request #15863:
URL: https://github.com/apache/beam/pull/15863#discussion_r759755819



##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1549,8 +1570,47 @@ void set(
       checkArgument(
           (getDataSourceProviderFn() != null),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
-
-      return input.apply(
+      checkArgument(
+          getAutoSharding() == null
+              || (getAutoSharding() && input.isBounded() != IsBounded.UNBOUNDED),
+          "Autosharding is only supported for streaming pipelines.");
+      ;
+
+      PCollection<Iterable<T>> iterables;
+      if (input.isBounded() == IsBounded.UNBOUNDED
+          && getAutoSharding() != null
+          && getAutoSharding()) {

Review comment:
       "&& getAutoSharding()" is redundant ?

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1549,8 +1570,47 @@ void set(
       checkArgument(
           (getDataSourceProviderFn() != null),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
-
-      return input.apply(
+      checkArgument(
+          getAutoSharding() == null
+              || (getAutoSharding() && input.isBounded() != IsBounded.UNBOUNDED),
+          "Autosharding is only supported for streaming pipelines.");
+      ;
+
+      PCollection<Iterable<T>> iterables;
+      if (input.isBounded() == IsBounded.UNBOUNDED
+          && getAutoSharding() != null
+          && getAutoSharding()) {
+        iterables =
+            input
+                .apply(WithKeys.<String, T>of(""))
+                .apply(
+                    GroupIntoBatches.<String, T>ofSize(DEFAULT_BATCH_SIZE)
+                        .withMaxBufferingDuration(Duration.millis(200))
+                        .withShardedKey())
+                .apply(Values.create());
+      } else {
+        iterables =
+            input.apply(
+                ParDo.of(
+                    new DoFn<T, Iterable<T>>() {
+                      List<T> outputList;
+
+                      @ProcessElement
+                      public void process(ProcessContext c) {
+                        if (outputList == null) {
+                          outputList = new ArrayList<>();
+                        }
+                        outputList.add(c.element());

Review comment:
       Won't we run into OOMs if this list grows too much ?

##########
File path: sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
##########
@@ -258,6 +263,40 @@ private PipelineResult runRead() {
     return pipelineRead.run();
   }
 
+  @Test
+  public void testWriteWithAutosharding() throws Exception {
+    String firstTableName = DatabaseTestHelper.getTestTableName("UT_WRITE");
+    DatabaseTestHelper.createTable(dataSource, firstTableName);
+    try {
+      List<KV<Integer, String>> data = getTestDataToWrite(EXPECTED_ROW_COUNT);
+      TestStream.Builder<KV<Integer, String>> ts =
+          TestStream.create(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))
+              .advanceWatermarkTo(Instant.now());
+      for (KV<Integer, String> elm : data) {
+        ts.addElements(elm);
+      }
+
+      PCollection<KV<Integer, String>> dataCollection =
+          pipelineWrite.apply(ts.advanceWatermarkToInfinity());
+      dataCollection.apply(
+          JdbcIO.<KV<Integer, String>>write()
+              .withDataSourceProviderFn(voidInput -> dataSource)
+              .withStatement(String.format("insert into %s values(?, ?) returning *", tableName))
+              .withAutoSharding()

Review comment:
       Are we actually able to test that auto-sharding worked somehow ?

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1494,8 +1509,49 @@ void set(
       checkArgument(
           (getDataSourceProviderFn() != null),
           "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
-
-      return input.apply(
+      checkArgument(
+          ((getAutoSharding() == null || !getAutoSharding())
+                  && input.isBounded() == IsBounded.BOUNDED)
+              || input.isBounded() == IsBounded.UNBOUNDED,
+          "Autosharding is only supported for streaming pipelines.");
+      ;
+
+      PCollection<Iterable<T>> iterables;
+      if (input.isBounded() == IsBounded.UNBOUNDED
+          && getAutoSharding() != null
+          && getAutoSharding()) {
+        iterables =
+            input
+                .apply(WithKeys.<String, T>of(""))

Review comment:
       Should we consider moving this WithKeys.of() and the subsequent Values.create() into GroupIntoBatches to improve it's user interface (in a separate PR) ?

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1432,6 +1442,11 @@ void set(
       return toBuilder().setPreparedStatementSetter(setter).build();
     }
 
+    /** If true, enables using a dynamically determined number of shards to write. */
+    public WriteWithResults<T, V> withAutoSharding() {

Review comment:
       Yeah, that sounds like a separate feature. The purpose of this feature is for the runner to determine the appropriate amount of sharding automatically.

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1708,7 +1777,41 @@ void set(
         checkArgument(
             spec.getPreparedStatementSetter() != null, "withPreparedStatementSetter() is required");
       }
-      return input
+      PCollection<Iterable<T>> iterables;

Review comment:
       Can we share code between WriteWithResult and WriteVoid ? This introduces a significant amount of code duplication.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org