You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yh...@apache.org on 2023/10/19 14:42:20 UTC

[beam] branch master updated: [JdbcIO] Add fetchSize to the schema provider for partitioned reads (#29015)

This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 95be6b9bd00 [JdbcIO] Add fetchSize to the schema provider for partitioned reads (#29015)
95be6b9bd00 is described below

commit 95be6b9bd004152dba0b1451c0acde316a5a63c8
Author: Bruno Volpato <bv...@google.com>
AuthorDate: Thu Oct 19 10:42:14 2023 -0400

    [JdbcIO] Add fetchSize to the schema provider for partitioned reads (#29015)
---
 .../io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java   | 6 ++++++
 .../main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java | 6 ++++++
 2 files changed, 12 insertions(+)

diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index f8dad23d1fb..e2a4a8e1072 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -1288,6 +1288,12 @@ public class JdbcIO {
       return toBuilder().setPartitionColumn(partitionColumn).build();
     }
 
+    /** The number of rows to fetch from the database in the same {@link ResultSet} round-trip. */
+    public ReadWithPartitions<T, PartitionColumnT> withFetchSize(int fetchSize) {
+      checkArgument(fetchSize > 0, "fetchSize can not be less than 1");
+      return toBuilder().setFetchSize(fetchSize).build();
+    }
+
     /** Data output type is {@link Row}, and schema is auto-inferred from the database. */
     public ReadWithPartitions<T, PartitionColumnT> withRowOutput() {
       return toBuilder().setUseBeamSchema(true).build();
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
index c68b33a0260..4b5dc0d7e24 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
@@ -134,6 +134,12 @@ public class JdbcSchemaIOProvider implements SchemaIOProvider {
             if (partitions != null) {
               readRows = readRows.withNumPartitions(partitions);
             }
+
+            @Nullable Short fetchSize = config.getInt16("fetchSize");
+            if (fetchSize != null) {
+              readRows = readRows.withFetchSize(fetchSize);
+            }
+
             return input.apply(readRows);
           } else {