You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/07/10 17:44:36 UTC
[beam] branch master updated: [BEAM-9629] Fix several connection
leak issues. (#12209)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 47a6114 [BEAM-9629] Fix several connection leak issues. (#12209)
47a6114 is described below
commit 47a611423053dc50bd55a156a5f58c040c9ab1cd
Author: Lukasz Cwik <lu...@gmail.com>
AuthorDate: Fri Jul 10 10:44:13 2020 -0700
[BEAM-9629] Fix several connection leak issues. (#12209)
* Make the default max connections to be unlimited. This mirrors the 2.17 and earlier behavior since we used to create one DataSource per execution thread (and hence one connection).
* Move connection fetching to @ProcessElement to not hold a connection for empty bundles.
* Make sure that finalize() closes the connection so that it is returned to the pool if the bundle fails.
---
.../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java | 69 ++++++++++++++++------
1 file changed, 51 insertions(+), 18 deletions(-)
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index f52ff40..d03925e 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -147,10 +147,10 @@ import org.slf4j.LoggerFactory;
* );
* }</pre>
*
- * By default, the provided function instantiates a DataSource per execution thread. In some
- * circumstances, such as DataSources that have a pool of connections, this can quickly overwhelm
- * the database by requesting too many connections. In that case you should make the DataSource a
- * static singleton so it gets instantiated only once per JVM.
+ * <p>By default, the provided function requests a DataSource per execution thread. In some
+ * circumstances this can quickly overwhelm the database by requesting too many connections. In that
+ * case you should look into sharing a single instance of a {@link PoolingDataSource} across all the
+ * execution threads.
*
* <h3>Writing to JDBC datasource</h3>
*
@@ -883,11 +883,14 @@ public class JdbcIO {
@Setup
public void setup() throws Exception {
dataSource = dataSourceProviderFn.apply(null);
- connection = dataSource.getConnection();
}
@ProcessElement
public void processElement(ProcessContext context) throws Exception {
+ // Only acquire the connection if we need to perform a read.
+ if (connection == null) {
+ connection = dataSource.getConnection();
+ }
try (PreparedStatement statement =
connection.prepareStatement(
query.get(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
@@ -901,9 +904,24 @@ public class JdbcIO {
}
}
- @Teardown
- public void teardown() throws Exception {
- connection.close();
+ @FinishBundle
+ public void finishBundle() throws Exception {
+ cleanUpConnection();
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ cleanUpConnection();
+ }
+
+ private void cleanUpConnection() throws Exception {
+ if (connection != null) {
+ try {
+ connection.close();
+ } finally {
+ connection = null;
+ }
+ }
}
}
@@ -1354,13 +1372,6 @@ public class JdbcIO {
.withMaxRetries(retryConfiguration.getMaxAttempts());
}
- @StartBundle
- public void startBundle() throws Exception {
- connection = dataSource.getConnection();
- connection.setAutoCommit(false);
- preparedStatement = connection.prepareStatement(spec.getStatement().get());
- }
-
@ProcessElement
public void processElement(ProcessContext context) throws Exception {
T record = context.element();
@@ -1385,13 +1396,30 @@ public class JdbcIO {
@FinishBundle
public void finishBundle() throws Exception {
executeBatch();
+ cleanUpStatementAndConnection();
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ cleanUpStatementAndConnection();
+ }
+
+ private void cleanUpStatementAndConnection() throws Exception {
try {
if (preparedStatement != null) {
- preparedStatement.close();
+ try {
+ preparedStatement.close();
+ } finally {
+ preparedStatement = null;
+ }
}
} finally {
if (connection != null) {
- connection.close();
+ try {
+ connection.close();
+ } finally {
+ connection = null;
+ }
}
}
}
@@ -1400,6 +1428,12 @@ public class JdbcIO {
if (records.isEmpty()) {
return;
}
+ // Only acquire the connection if there is something to write.
+ if (connection == null) {
+ connection = dataSource.getConnection();
+ connection.setAutoCommit(false);
+ preparedStatement = connection.prepareStatement(spec.getStatement().get());
+ }
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = retryBackOff.backoff();
while (true) {
@@ -1498,7 +1532,6 @@ public class JdbcIO {
PoolableConnectionFactory poolableConnectionFactory =
new PoolableConnectionFactory(connectionFactory, null);
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
- poolConfig.setMaxTotal(1);
poolConfig.setMinIdle(0);
poolConfig.setMinEvictableIdleTimeMillis(10000);
poolConfig.setSoftMinEvictableIdleTimeMillis(30000);