You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/04/29 10:23:06 UTC
[3/5] camel git commit: CAMEL-8715: camel-sql - Should close ResultSet
CAMEL-8715: camel-sql - Should close ResultSet
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cc9160ac
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cc9160ac
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cc9160ac
Branch: refs/heads/master
Commit: cc9160ac7e416decbddf9f545952f7a7c3ef5d80
Parents: cd95b8b
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Apr 29 08:03:16 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Apr 29 10:26:31 2015 +0200
----------------------------------------------------------------------
.../apache/camel/component/sql/SqlConsumer.java | 5 +-
.../apache/camel/component/sql/SqlEndpoint.java | 5 +-
.../apache/camel/component/sql/SqlProducer.java | 145 ++++++++++---------
3 files changed, 82 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/cc9160ac/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
index a9e4b49..cdddd81 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
@@ -21,7 +21,6 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.Queue;
import org.apache.camel.Exchange;
@@ -36,6 +35,8 @@ import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementCallback;
+import static org.springframework.jdbc.support.JdbcUtils.closeResultSet;
+
public class SqlConsumer extends ScheduledBatchPollingConsumer {
private final String query;
@@ -110,7 +111,7 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
throw new IllegalArgumentException("Invalid outputType=" + outputType);
}
} finally {
- rs.close();
+ closeResultSet(rs);
}
// process all the exchanges in this batch
http://git-wip-us.apache.org/repos/asf/camel/blob/cc9160ac/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
index 9ddbda9..507b071 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
@@ -346,10 +346,11 @@ public class SqlEndpoint extends DefaultPollingEndpoint {
return "sql:" + UnsafeUriCharactersEncoder.encode(query);
}
+ @SuppressWarnings("unchecked")
protected List<?> queryForList(ResultSet rs, boolean allowMapToClass) throws SQLException {
if (allowMapToClass && outputClass != null) {
- Class<?> outputClzz = getCamelContext().getClassResolver().resolveClass(outputClass);
- RowMapper rowMapper = new BeanPropertyRowMapper(outputClzz);
+ Class<?> outputClazz = getCamelContext().getClassResolver().resolveClass(outputClass);
+ RowMapper rowMapper = new BeanPropertyRowMapper(outputClazz);
RowMapperResultSetExtractor<?> mapper = new RowMapperResultSetExtractor(rowMapper);
List<?> data = mapper.extractData(rs);
return data;
http://git-wip-us.apache.org/repos/asf/camel/blob/cc9160ac/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
index a4554a8..c5eda07 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
@@ -32,6 +32,8 @@ import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementCallback;
import org.springframework.jdbc.core.PreparedStatementCreator;
+import static org.springframework.jdbc.support.JdbcUtils.closeResultSet;
+
public class SqlProducer extends DefaultProducer {
private String query;
private JdbcTemplate jdbcTemplate;
@@ -89,59 +91,48 @@ public class SqlProducer extends DefaultProducer {
jdbcTemplate.execute(statementCreator, new PreparedStatementCallback<Map<?, ?>>() {
public Map<?, ?> doInPreparedStatement(PreparedStatement ps) throws SQLException {
- int expected = parametersCount > 0 ? parametersCount : ps.getParameterMetaData().getParameterCount();
-
- // only populate if really needed
- if (alwaysPopulateStatement || expected > 0) {
- // transfer incoming message body data to prepared statement parameters, if necessary
- if (batch) {
- Iterator<?> iterator = exchange.getIn().getBody(Iterator.class);
- while (iterator != null && iterator.hasNext()) {
- Object value = iterator.next();
- Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, exchange, value);
+ ResultSet rs = null;
+ try {
+ int expected = parametersCount > 0 ? parametersCount : ps.getParameterMetaData().getParameterCount();
+
+ // only populate if really needed
+ if (alwaysPopulateStatement || expected > 0) {
+ // transfer incoming message body data to prepared statement parameters, if necessary
+ if (batch) {
+ Iterator<?> iterator = exchange.getIn().getBody(Iterator.class);
+ while (iterator != null && iterator.hasNext()) {
+ Object value = iterator.next();
+ Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, exchange, value);
+ sqlPrepareStatementStrategy.populateStatement(ps, i, expected);
+ ps.addBatch();
+ }
+ } else {
+ Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, exchange, exchange.getIn().getBody());
sqlPrepareStatementStrategy.populateStatement(ps, i, expected);
- ps.addBatch();
}
- } else {
- Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, exchange, exchange.getIn().getBody());
- sqlPrepareStatementStrategy.populateStatement(ps, i, expected);
}
- }
- boolean isResultSet = false;
+ boolean isResultSet = false;
- // execute the prepared statement and populate the outgoing message
- if (batch) {
- int[] updateCounts = ps.executeBatch();
- int total = 0;
- for (int count : updateCounts) {
- total += count;
- }
- exchange.getIn().setHeader(SqlConstants.SQL_UPDATE_COUNT, total);
- } else {
- isResultSet = ps.execute();
- if (isResultSet) {
- // preserve headers first, so we can override the SQL_ROW_COUNT header
- exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
-
- ResultSet rs = ps.getResultSet();
- SqlOutputType outputType = getEndpoint().getOutputType();
- log.trace("Got result list from query: {}, outputType={}", rs, outputType);
- if (outputType == SqlOutputType.SelectList) {
- List<?> data = getEndpoint().queryForList(rs, true);
- // for noop=true we still want to enrich with the row count header
- if (getEndpoint().isNoop()) {
- exchange.getOut().setBody(exchange.getIn().getBody());
- } else if (getEndpoint().getOutputHeader() != null) {
- exchange.getOut().setBody(exchange.getIn().getBody());
- exchange.getOut().setHeader(getEndpoint().getOutputHeader(), data);
- } else {
- exchange.getOut().setBody(data);
- }
- exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, data.size());
- } else if (outputType == SqlOutputType.SelectOne) {
- Object data = getEndpoint().queryForObject(rs);
- if (data != null) {
+ // execute the prepared statement and populate the outgoing message
+ if (batch) {
+ int[] updateCounts = ps.executeBatch();
+ int total = 0;
+ for (int count : updateCounts) {
+ total += count;
+ }
+ exchange.getIn().setHeader(SqlConstants.SQL_UPDATE_COUNT, total);
+ } else {
+ isResultSet = ps.execute();
+ if (isResultSet) {
+ // preserve headers first, so we can override the SQL_ROW_COUNT header
+ exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
+
+ rs = ps.getResultSet();
+ SqlOutputType outputType = getEndpoint().getOutputType();
+ log.trace("Got result list from query: {}, outputType={}", rs, outputType);
+ if (outputType == SqlOutputType.SelectList) {
+ List<?> data = getEndpoint().queryForList(rs, true);
// for noop=true we still want to enrich with the row count header
if (getEndpoint().isNoop()) {
exchange.getOut().setBody(exchange.getIn().getBody());
@@ -151,35 +142,51 @@ public class SqlProducer extends DefaultProducer {
} else {
exchange.getOut().setBody(data);
}
- exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, 1);
+ exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, data.size());
+ } else if (outputType == SqlOutputType.SelectOne) {
+ Object data = getEndpoint().queryForObject(rs);
+ if (data != null) {
+ // for noop=true we still want to enrich with the row count header
+ if (getEndpoint().isNoop()) {
+ exchange.getOut().setBody(exchange.getIn().getBody());
+ } else if (getEndpoint().getOutputHeader() != null) {
+ exchange.getOut().setBody(exchange.getIn().getBody());
+ exchange.getOut().setHeader(getEndpoint().getOutputHeader(), data);
+ } else {
+ exchange.getOut().setBody(data);
+ }
+ exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, 1);
+ }
+ } else {
+ throw new IllegalArgumentException("Invalid outputType=" + outputType);
}
} else {
- throw new IllegalArgumentException("Invalid outputType=" + outputType);
+ exchange.getIn().setHeader(SqlConstants.SQL_UPDATE_COUNT, ps.getUpdateCount());
}
- } else {
- exchange.getIn().setHeader(SqlConstants.SQL_UPDATE_COUNT, ps.getUpdateCount());
}
- }
- if (shouldRetrieveGeneratedKeys) {
- // if no OUT message yet then create one and propagate headers
- if (!exchange.hasOut()) {
- exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
- }
+ if (shouldRetrieveGeneratedKeys) {
+ // if no OUT message yet then create one and propagate headers
+ if (!exchange.hasOut()) {
+ exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
+ }
- if (isResultSet) {
- // we won't return generated keys for SELECT statements
- exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_DATA, Collections.EMPTY_LIST);
- exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_ROW_COUNT, 0);
- } else {
- List<?> generatedKeys = getEndpoint().queryForList(ps.getGeneratedKeys(), false);
- exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_DATA, generatedKeys);
- exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_ROW_COUNT, generatedKeys.size());
+ if (isResultSet) {
+ // we won't return generated keys for SELECT statements
+ exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_DATA, Collections.EMPTY_LIST);
+ exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_ROW_COUNT, 0);
+ } else {
+ List<?> generatedKeys = getEndpoint().queryForList(ps.getGeneratedKeys(), false);
+ exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_DATA, generatedKeys);
+ exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_ROW_COUNT, generatedKeys.size());
+ }
}
- }
- // data is set on exchange so return null
- return null;
+ // data is set on exchange so return null
+ return null;
+ } finally {
+ closeResultSet(rs);
+ }
}
});
}