You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/06/03 16:52:09 UTC

[GitHub] [beam] aromanenko-dev commented on a change in pull request #14856: [BEAM-11873] Add support for writes with returning values in JdbcIO

aromanenko-dev commented on a change in pull request #14856:
URL: https://github.com/apache/beam/pull/14856#discussion_r644964072



##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1222,7 +1239,244 @@ void set(
         throws SQLException;
   }
 
-  /** A {@link PTransform} to write to a JDBC datasource. */
+  /**
+   * A {@link PTransform} to write to a JDBC datasource. Executes statements one by one.
+   *
+   * <p>The INSERT, UPDATE, and DELETE commands sometimes have an optional RETURNING clause that
+   * supports obtaining data from modified rows while they are being manipulated. Output {@link
+   * PCollection} of this transform is a collection of such returning results mapped by {@link
+   * RowMapper}.
+   */
+  @AutoValue
+  public abstract static class WriteWithResults<T, V>
+      extends PTransform<PCollection<T>, PCollection<V>> {

Review comment:
       Would it better to wrap up `V` into Beam abstract class or interface, e.g. `JdbcWriteResult`?

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1070,15 +1070,32 @@ public static RetryConfiguration create(
      *
      * <pre>{@code
      * PCollection<Void> firstWriteResults = data.apply(JdbcIO.write()
-     *     .withDataSourceConfiguration(CONF_DB_1).withResults());
+     *     .withDataSourceConfiguration(CONF_DB_1).withVoidResults());
      * data.apply(Wait.on(firstWriteResults))
      *     .apply(JdbcIO.write().withDataSourceConfiguration(CONF_DB_2));
      * }</pre>
      */
-    public WriteVoid<T> withResults() {
+    public WriteVoid<T> withVoidResults() {
       return inner;
     }
 
+    /**
+     * Returns {@link WriteWithResults} transform that could return a specific result.
+     *
+     * <p>See {@link WriteWithResults}
+     */
+    public <V> WriteWithResults<T, V> withReturningResults(RowMapper<V> rowMapper) {
+      return new AutoValue_JdbcIO_WriteWithResults.Builder<T, V>()

Review comment:
       To avoid copying the same config methods, it might make sense to move them into separate class, e.g. JdbcConfig (like [BigtableConfig](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java)). Though, we can leave it for refactoring later.

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -262,10 +262,10 @@ public static ReadRows readRows() {
    * @param <T> Type of the data to be written.
    */
   public static <T> Write<T> write() {
-    return new Write();
+    return new Write<>();
   }
 
-  public static <T> WriteVoid<T> writeVoid() {
+  private static <T> WriteVoid<T> writeVoid() {

Review comment:
       It's a breaking change. Why it's made private? If it's not used anymore, then we need to deprecate it before removing (usually, it requires a minimum of 3 releases for deprecation period). 

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1222,7 +1239,244 @@ void set(
         throws SQLException;
   }
 
-  /** A {@link PTransform} to write to a JDBC datasource. */
+  /**
+   * A {@link PTransform} to write to a JDBC datasource. Executes statements one by one.
+   *
+   * <p>The INSERT, UPDATE, and DELETE commands sometimes have an optional RETURNING clause that
+   * supports obtaining data from modified rows while they are being manipulated. Output {@link
+   * PCollection} of this transform is a collection of such returning results mapped by {@link
+   * RowMapper}.
+   */
+  @AutoValue
+  public abstract static class WriteWithResults<T, V>
+      extends PTransform<PCollection<T>, PCollection<V>> {
+    abstract @Nullable SerializableFunction<Void, DataSource> getDataSourceProviderFn();
+
+    abstract @Nullable ValueProvider<String> getStatement();
+
+    abstract @Nullable PreparedStatementSetter<T> getPreparedStatementSetter();
+
+    abstract @Nullable RetryStrategy getRetryStrategy();
+
+    abstract @Nullable RetryConfiguration getRetryConfiguration();
+
+    abstract @Nullable String getTable();
+
+    abstract @Nullable RowMapper<V> getRowMapper();
+
+    abstract Builder<T, V> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T, V> {
+      abstract Builder<T, V> setDataSourceProviderFn(
+          SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+      abstract Builder<T, V> setStatement(ValueProvider<String> statement);
+
+      abstract Builder<T, V> setPreparedStatementSetter(PreparedStatementSetter<T> setter);
+
+      abstract Builder<T, V> setRetryStrategy(RetryStrategy deadlockPredicate);
+
+      abstract Builder<T, V> setRetryConfiguration(RetryConfiguration retryConfiguration);
+
+      abstract Builder<T, V> setTable(String table);
+
+      abstract Builder<T, V> setRowMapper(RowMapper<V> rowMapper);
+
+      abstract WriteWithResults<T, V> build();
+    }
+
+    public WriteWithResults<T, V> withDataSourceConfiguration(DataSourceConfiguration config) {
+      return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
+    }
+
+    public WriteWithResults<T, V> withDataSourceProviderFn(
+        SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+      return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+    }
+
+    public WriteWithResults<T, V> withStatement(String statement) {
+      return withStatement(ValueProvider.StaticValueProvider.of(statement));
+    }
+
+    public WriteWithResults<T, V> withStatement(ValueProvider<String> statement) {
+      return toBuilder().setStatement(statement).build();
+    }
+
+    public WriteWithResults<T, V> withPreparedStatementSetter(PreparedStatementSetter<T> setter) {
+      return toBuilder().setPreparedStatementSetter(setter).build();
+    }
+
+    /**
+     * When a SQL exception occurs, {@link Write} uses this {@link RetryStrategy} to determine if it
+     * will retry the statements. If {@link RetryStrategy#apply(SQLException)} returns {@code true},
+     * then {@link Write} retries the statements.
+     */
+    public WriteWithResults<T, V> withRetryStrategy(RetryStrategy retryStrategy) {
+      checkArgument(retryStrategy != null, "retryStrategy can not be null");
+      return toBuilder().setRetryStrategy(retryStrategy).build();
+    }
+
+    /**
+     * When a SQL exception occurs, {@link Write} uses this {@link RetryConfiguration} to
+     * exponentially back off and retry the statements based on the {@link RetryConfiguration}
+     * mentioned.
+     *
+     * <p>Usage of RetryConfiguration -
+     *
+     * <pre>{@code
+     * pipeline.apply(JdbcIO.<T>write())
+     *    .withReturningResults(...)
+     *    .withDataSourceConfiguration(...)
+     *    .withRetryStrategy(...)
+     *    .withRetryConfiguration(JdbcIO.RetryConfiguration.
+     *        create(5, Duration.standardSeconds(5), Duration.standardSeconds(1))
+     *
+     * }</pre>
+     *
+     * maxDuration and initialDuration are Nullable
+     *
+     * <pre>{@code
+     * pipeline.apply(JdbcIO.<T>write())
+     *    .withReturningResults(...)
+     *    .withDataSourceConfiguration(...)
+     *    .withRetryStrategy(...)
+     *    .withRetryConfiguration(JdbcIO.RetryConfiguration.
+     *        create(5, null, null)
+     *
+     * }</pre>
+     */
+    public WriteWithResults<T, V> withRetryConfiguration(RetryConfiguration retryConfiguration) {
+      checkArgument(retryConfiguration != null, "retryConfiguration can not be null");
+      return toBuilder().setRetryConfiguration(retryConfiguration).build();
+    }
+
+    public WriteWithResults<T, V> withTable(String table) {
+      checkArgument(table != null, "table name can not be null");
+      return toBuilder().setTable(table).build();
+    }
+
+    public WriteWithResults<T, V> withRowMapper(RowMapper<V> rowMapper) {
+      checkArgument(rowMapper != null, "result set getter can not be null");
+      return toBuilder().setRowMapper(rowMapper).build();
+    }
+
+    @Override
+    public PCollection<V> expand(PCollection<T> input) {
+      checkArgument(getStatement() != null, "withStatement() is required");
+      checkArgument(
+          getPreparedStatementSetter() != null, "withPreparedStatementSetter() is required");
+      checkArgument(
+          (getDataSourceProviderFn() != null),
+          "withDataSourceConfiguration() or withDataSourceProviderFn() is required");
+
+      return input.apply(ParDo.of(new WriteWithResultsFn<>(this)));
+    }
+
+    private static class WriteWithResultsFn<T, V> extends DoFn<T, V> {
+
+      private final WriteWithResults<T, V> spec;
+      private DataSource dataSource;
+      private Connection connection;
+      private PreparedStatement preparedStatement;
+      private static FluentBackoff retryBackOff;
+
+      public WriteWithResultsFn(WriteWithResults<T, V> spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void setup() {
+        dataSource = spec.getDataSourceProviderFn().apply(null);
+        RetryConfiguration retryConfiguration = spec.getRetryConfiguration();
+
+        retryBackOff =
+            FluentBackoff.DEFAULT
+                .withInitialBackoff(retryConfiguration.getInitialDuration())
+                .withMaxCumulativeBackoff(retryConfiguration.getMaxDuration())
+                .withMaxRetries(retryConfiguration.getMaxAttempts());
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context) throws Exception {
+        T record = context.element();

Review comment:
       Why not to write records by batches? I guess that writing record-by-record may significantly affect the performance.

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1070,15 +1070,32 @@ public static RetryConfiguration create(
      *
      * <pre>{@code
      * PCollection<Void> firstWriteResults = data.apply(JdbcIO.write()
-     *     .withDataSourceConfiguration(CONF_DB_1).withResults());
+     *     .withDataSourceConfiguration(CONF_DB_1).withVoidResults());
      * data.apply(Wait.on(firstWriteResults))
      *     .apply(JdbcIO.write().withDataSourceConfiguration(CONF_DB_2));
      * }</pre>
      */
-    public WriteVoid<T> withResults() {
+    public WriteVoid<T> withVoidResults() {

Review comment:
       This is also a breaking change of user API. Could we avoid it?

##########
File path: sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1070,15 +1070,32 @@ public static RetryConfiguration create(
      *
      * <pre>{@code
      * PCollection<Void> firstWriteResults = data.apply(JdbcIO.write()
-     *     .withDataSourceConfiguration(CONF_DB_1).withResults());
+     *     .withDataSourceConfiguration(CONF_DB_1).withVoidResults());
      * data.apply(Wait.on(firstWriteResults))
      *     .apply(JdbcIO.write().withDataSourceConfiguration(CONF_DB_2));
      * }</pre>
      */
-    public WriteVoid<T> withResults() {
+    public WriteVoid<T> withVoidResults() {
       return inner;
     }
 
+    /**
+     * Returns {@link WriteWithResults} transform that could return a specific result.
+     *
+     * <p>See {@link WriteWithResults}
+     */
+    public <V> WriteWithResults<T, V> withReturningResults(RowMapper<V> rowMapper) {

Review comment:
       I'd suggest to keep and deprecate the old `withResults()` and name this method (`withReturningResults()`)  as `withWriteResults()` like BigtableIO [does](https://github.com/apache/beam/blob/f881a412fe85c64b1caf075160a6c0312995ea45/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715) (to keep the same naming policy for the similar functionality across different IOs). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org