You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jo...@apache.org on 2022/11/22 18:38:26 UTC

[beam] branch master updated: Add retry to test connections (#23757)

This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 67e2008c0ee Add retry to test connections (#23757)
67e2008c0ee is described below

commit 67e2008c0ee6878e6a95a361e41d02734fba4ad1
Author: johnjcasey <95...@users.noreply.github.com>
AuthorDate: Tue Nov 22 13:38:18 2022 -0500

    Add retry to test connections (#23757)
    
    * Move connection setup logic for JDBCIO WriteFn to @startBundle to limit parallel calls to datasource.getConnection()
    
    * move connection setup logic back to processElement.
    Put a retry into the DatabaseTestHelper
    
    * run spotless
    
    * use fluent backoff instead of manual implementation
    
    * refactor to manual resource management
    
    * run spotless
---
 .../beam/sdk/io/common/DatabaseTestHelper.java     | 42 ++++++++++++++++++++--
 1 file changed, 39 insertions(+), 3 deletions(-)

diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java
index 3cfe08ba4d1..1204fa7ada6 100644
--- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java
@@ -34,8 +34,13 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import javax.sql.DataSource;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.joda.time.Duration;
 import org.postgresql.ds.PGSimpleDataSource;
 import org.testcontainers.containers.JdbcDatabaseContainer;
 
@@ -86,9 +91,40 @@ public class DatabaseTestHelper {
         fieldsAndTypes.stream()
             .map(kv -> kv.getKey() + " " + kv.getValue())
             .collect(Collectors.joining(", "));
-    try (Connection connection = dataSource.getConnection()) {
-      try (Statement statement = connection.createStatement()) {
-        statement.execute(String.format("create table %s (%s)", tableName, fieldsList));
+    SQLException exception = null;
+    Sleeper sleeper = Sleeper.DEFAULT;
+    BackOff backoff =
+        FluentBackoff.DEFAULT
+            .withInitialBackoff(Duration.standardSeconds(1))
+            .withMaxCumulativeBackoff(Duration.standardMinutes(5))
+            .withMaxRetries(4)
+            .backoff();
+    while (true) {
+      // This is not implemented as try-with-resources because it appears that try-with-resources is
+      // not correctly catching the PSQLException thrown by dataSource.getConnection()
+      Connection connection = null;
+      try {
+        connection = dataSource.getConnection();
+        try (Statement statement = connection.createStatement()) {
+          statement.execute(String.format("create table %s (%s)", tableName, fieldsList));
+          return;
+        }
+      } catch (SQLException e) {
+        exception = e;
+      } finally {
+        if (connection != null) {
+          connection.close();
+        }
+      }
+      boolean hasNext;
+      try {
+        hasNext = BackOffUtils.next(sleeper, backoff);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      if (!hasNext) {
+        // we tried the max number of times
+        throw exception;
       }
     }
   }