You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jo...@apache.org on 2022/11/22 18:38:26 UTC
[beam] branch master updated: Add retry to test connections (#23757)
This is an automated email from the ASF dual-hosted git repository.
johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 67e2008c0ee Add retry to test connections (#23757)
67e2008c0ee is described below
commit 67e2008c0ee6878e6a95a361e41d02734fba4ad1
Author: johnjcasey <95...@users.noreply.github.com>
AuthorDate: Tue Nov 22 13:38:18 2022 -0500
Add retry to test connections (#23757)
* Move connection setup logic for JDBCIO WriteFn to @startBundle to limit parallel calls to datasource.getConnection()
* move connection setup logic back to processElement.
Put a retry into the DatabaseTestHelper
* run spotless
* use fluent backoff instead of manual implementation
* refactor to manual resource management
* run spotless
---
.../beam/sdk/io/common/DatabaseTestHelper.java | 42 ++++++++++++++++++++--
1 file changed, 39 insertions(+), 3 deletions(-)
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java
index 3cfe08ba4d1..1204fa7ada6 100644
--- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java
@@ -34,8 +34,13 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.sql.DataSource;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.joda.time.Duration;
import org.postgresql.ds.PGSimpleDataSource;
import org.testcontainers.containers.JdbcDatabaseContainer;
@@ -86,9 +91,40 @@ public class DatabaseTestHelper {
fieldsAndTypes.stream()
.map(kv -> kv.getKey() + " " + kv.getValue())
.collect(Collectors.joining(", "));
- try (Connection connection = dataSource.getConnection()) {
- try (Statement statement = connection.createStatement()) {
- statement.execute(String.format("create table %s (%s)", tableName, fieldsList));
+ SQLException exception = null;
+ Sleeper sleeper = Sleeper.DEFAULT;
+ BackOff backoff =
+ FluentBackoff.DEFAULT
+ .withInitialBackoff(Duration.standardSeconds(1))
+ .withMaxCumulativeBackoff(Duration.standardMinutes(5))
+ .withMaxRetries(4)
+ .backoff();
+ while (true) {
+ // This is not implemented as try-with-resources because it appears that try-with-resources is
+ // not correctly catching the PSQLException thrown by dataSource.getConnection()
+ Connection connection = null;
+ try {
+ connection = dataSource.getConnection();
+ try (Statement statement = connection.createStatement()) {
+ statement.execute(String.format("create table %s (%s)", tableName, fieldsList));
+ return;
+ }
+ } catch (SQLException e) {
+ exception = e;
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ boolean hasNext;
+ try {
+ hasNext = BackOffUtils.next(sleeper, backoff);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ if (!hasNext) {
+ // we tried the max number of times
+ throw exception;
}
}
}