You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yh...@apache.org on 2023/10/19 14:42:20 UTC
[beam] branch master updated: [JdbcIO] Add fetchSize to the schema provider for partitioned reads (#29015)
This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 95be6b9bd00 [JdbcIO] Add fetchSize to the schema provider for partitioned reads (#29015)
95be6b9bd00 is described below
commit 95be6b9bd004152dba0b1451c0acde316a5a63c8
Author: Bruno Volpato <bv...@google.com>
AuthorDate: Thu Oct 19 10:42:14 2023 -0400
[JdbcIO] Add fetchSize to the schema provider for partitioned reads (#29015)
---
.../io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java | 6 ++++++
.../main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java | 6 ++++++
2 files changed, 12 insertions(+)
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index f8dad23d1fb..e2a4a8e1072 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -1288,6 +1288,12 @@ public class JdbcIO {
return toBuilder().setPartitionColumn(partitionColumn).build();
}
+ /** The number of rows to fetch from the database in the same {@link ResultSet} round-trip. */
+ public ReadWithPartitions<T, PartitionColumnT> withFetchSize(int fetchSize) {
+ checkArgument(fetchSize > 0, "fetchSize can not be less than 1");
+ return toBuilder().setFetchSize(fetchSize).build();
+ }
+
/** Data output type is {@link Row}, and schema is auto-inferred from the database. */
public ReadWithPartitions<T, PartitionColumnT> withRowOutput() {
return toBuilder().setUseBeamSchema(true).build();
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
index c68b33a0260..4b5dc0d7e24 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
@@ -134,6 +134,12 @@ public class JdbcSchemaIOProvider implements SchemaIOProvider {
if (partitions != null) {
readRows = readRows.withNumPartitions(partitions);
}
+
+ @Nullable Short fetchSize = config.getInt16("fetchSize");
+ if (fetchSize != null) {
+ readRows = readRows.withFetchSize(fetchSize);
+ }
+
return input.apply(readRows);
} else {