You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/04/14 12:12:15 UTC
[camel] branch camel-3.x updated: CAMEL-19256: camel-jdbc - close stmt in the finnal block (#9827) (#9860)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.x by this push:
new f629ca35774 CAMEL-19256: camel-jdbc - close stmt in the finnal block (#9827) (#9860)
f629ca35774 is described below
commit f629ca3577478b6c9f50bf824cecce09a349f27a
Author: Zheng Feng <zh...@gmail.com>
AuthorDate: Fri Apr 14 20:12:08 2023 +0800
CAMEL-19256: camel-jdbc - close stmt in the finnal block (#9827) (#9860)
---
.../apache/camel/component/jdbc/JdbcProducer.java | 96 ++++++++++++++--------
1 file changed, 63 insertions(+), 33 deletions(-)
diff --git a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
index a774d6249f2..c09f96e413c 100644
--- a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
+++ b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
@@ -32,7 +32,6 @@ import javax.sql.DataSource;
import org.apache.camel.Exchange;
import org.apache.camel.ExtendedExchange;
-import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.DefaultProducer;
import org.apache.camel.support.PropertyBindingSupport;
import org.apache.camel.support.SynchronizationAdapter;
@@ -103,6 +102,21 @@ public class JdbcProducer extends DefaultProducer {
if (shouldCloseResources) {
resetAutoCommit(conn, autoCommit);
closeQuietly(conn);
+ } else {
+ final Connection finalConn = conn;
+ final boolean finalAutoCommit = autoCommit;
+ exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() {
+ @Override
+ public void onDone(Exchange exchange) {
+ resetAutoCommit(finalConn, finalAutoCommit);
+ closeQuietly(finalConn);
+ }
+
+ @Override
+ public int getOrder() {
+ return LOWEST + 200;
+ }
+ });
}
}
}
@@ -118,6 +132,19 @@ public class JdbcProducer extends DefaultProducer {
} finally {
if (shouldCloseResources && !connectionStrategy.isConnectionTransactional(conn, dataSource)) {
closeQuietly(conn);
+ } else {
+ final Connection finalConn = conn;
+ exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() {
+ @Override
+ public void onDone(Exchange exchange) {
+ closeQuietly(finalConn);
+ }
+
+ @Override
+ public int getOrder() {
+ return LOWEST + 200;
+ }
+ });
}
}
}
@@ -188,6 +215,22 @@ public class JdbcProducer extends DefaultProducer {
if (shouldCloseResources) {
closeQuietly(rs);
closeQuietly(ps);
+ } else {
+ final Statement finalPs = ps;
+ final ResultSet finalRs = rs;
+ exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() {
+ @Override
+ public void onDone(Exchange exchange) {
+ closeQuietly(finalRs);
+ closeQuietly(finalPs);
+ }
+
+ @Override
+ public int getOrder() {
+ // Make sure it happens before close Connection.
+ return LOWEST + 100;
+ }
+ });
}
}
return shouldCloseResources;
@@ -195,21 +238,12 @@ public class JdbcProducer extends DefaultProducer {
private boolean doCreateAndExecuteSqlStatement(Exchange exchange, String sql, Connection conn) throws Exception {
+ Statement stmt = null;
ResultSet rs = null;
boolean shouldCloseResources = true;
try {
- // We might need to leave it open to allow post-processing of the result set. This is why we
- // are not using try-with-resources here.
- final Statement stmt = conn.createStatement();
- // ensure statement is closed (to not leak) when exchange is done
- exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() {
- @Override
- public void onDone(Exchange exchange) {
- closeQuietly(stmt);
- }
- });
-
+ stmt = conn.createStatement();
bindParameters(exchange, stmt);
LOG.debug("Executing JDBC Statement: {}", sql);
@@ -250,6 +284,23 @@ public class JdbcProducer extends DefaultProducer {
} finally {
if (shouldCloseResources) {
closeQuietly(rs);
+ closeQuietly(stmt);
+ } else {
+ final Statement finalStmt = stmt;
+ final ResultSet finalRs = rs;
+ exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() {
+ @Override
+ public void onDone(Exchange exchange) {
+ closeQuietly(finalRs);
+ closeQuietly(finalStmt);
+ }
+
+ @Override
+ public int getOrder() {
+ // Make sure it happens before close Connection.
+ return LOWEST + 100;
+ }
+ });
}
}
return shouldCloseResources;
@@ -346,7 +397,6 @@ public class JdbcProducer extends DefaultProducer {
.setBody(new StreamListIterator(
getEndpoint().getCamelContext(), getEndpoint().getOutputClass(), getEndpoint().getBeanRowMapper(),
iterator));
- exchange.adapt(ExtendedExchange.class).addOnCompletion(new ResultSetIteratorCompletion(iterator));
// do not close resources as we are in streaming mode
answer = false;
} else if (outputType == JdbcOutputType.SelectList) {
@@ -395,24 +445,4 @@ public class JdbcProducer extends DefaultProducer {
return row;
}
}
-
- private static final class ResultSetIteratorCompletion implements Synchronization {
- private final ResultSetIterator iterator;
-
- private ResultSetIteratorCompletion(ResultSetIterator iterator) {
- this.iterator = iterator;
- }
-
- @Override
- public void onComplete(Exchange exchange) {
- iterator.close();
- iterator.closeConnection();
- }
-
- @Override
- public void onFailure(Exchange exchange) {
- iterator.close();
- iterator.closeConnection();
- }
- }
}