You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/04/29 10:23:06 UTC

[3/5] camel git commit: CAMEL-8715: camel-sql - Should close ResultSet

CAMEL-8715: camel-sql - Should close ResultSet


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cc9160ac
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cc9160ac
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cc9160ac

Branch: refs/heads/master
Commit: cc9160ac7e416decbddf9f545952f7a7c3ef5d80
Parents: cd95b8b
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Apr 29 08:03:16 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Apr 29 10:26:31 2015 +0200

----------------------------------------------------------------------
 .../apache/camel/component/sql/SqlConsumer.java |   5 +-
 .../apache/camel/component/sql/SqlEndpoint.java |   5 +-
 .../apache/camel/component/sql/SqlProducer.java | 145 ++++++++++---------
 3 files changed, 82 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/cc9160ac/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
index a9e4b49..cdddd81 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
@@ -21,7 +21,6 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Queue;
 
 import org.apache.camel.Exchange;
@@ -36,6 +35,8 @@ import org.springframework.dao.DataAccessException;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.jdbc.core.PreparedStatementCallback;
 
+import static org.springframework.jdbc.support.JdbcUtils.closeResultSet;
+
 public class SqlConsumer extends ScheduledBatchPollingConsumer {
 
     private final String query;
@@ -110,7 +111,7 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
                         throw new IllegalArgumentException("Invalid outputType=" + outputType);
                     }
                 } finally {
-                    rs.close();
+                    closeResultSet(rs);
                 }
 
                 // process all the exchanges in this batch

http://git-wip-us.apache.org/repos/asf/camel/blob/cc9160ac/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
index 9ddbda9..507b071 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
@@ -346,10 +346,11 @@ public class SqlEndpoint extends DefaultPollingEndpoint {
         return "sql:" + UnsafeUriCharactersEncoder.encode(query);
     }
 
+    @SuppressWarnings("unchecked")
     protected List<?> queryForList(ResultSet rs, boolean allowMapToClass) throws SQLException {
         if (allowMapToClass && outputClass != null) {
-            Class<?> outputClzz = getCamelContext().getClassResolver().resolveClass(outputClass);
-            RowMapper rowMapper = new BeanPropertyRowMapper(outputClzz);
+            Class<?> outputClazz = getCamelContext().getClassResolver().resolveClass(outputClass);
+            RowMapper rowMapper = new BeanPropertyRowMapper(outputClazz);
             RowMapperResultSetExtractor<?> mapper = new RowMapperResultSetExtractor(rowMapper);
             List<?> data = mapper.extractData(rs);
             return data;

http://git-wip-us.apache.org/repos/asf/camel/blob/cc9160ac/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
index a4554a8..c5eda07 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
@@ -32,6 +32,8 @@ import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.jdbc.core.PreparedStatementCallback;
 import org.springframework.jdbc.core.PreparedStatementCreator;
 
+import static org.springframework.jdbc.support.JdbcUtils.closeResultSet;
+
 public class SqlProducer extends DefaultProducer {
     private String query;
     private JdbcTemplate jdbcTemplate;
@@ -89,59 +91,48 @@ public class SqlProducer extends DefaultProducer {
 
         jdbcTemplate.execute(statementCreator, new PreparedStatementCallback<Map<?, ?>>() {
             public Map<?, ?> doInPreparedStatement(PreparedStatement ps) throws SQLException {
-                int expected = parametersCount > 0 ? parametersCount : ps.getParameterMetaData().getParameterCount();
-
-                // only populate if really needed
-                if (alwaysPopulateStatement || expected > 0) {
-                    // transfer incoming message body data to prepared statement parameters, if necessary
-                    if (batch) {
-                        Iterator<?> iterator = exchange.getIn().getBody(Iterator.class);
-                        while (iterator != null && iterator.hasNext()) {
-                            Object value = iterator.next();
-                            Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, exchange, value);
+                ResultSet rs = null;
+                try {
+                    int expected = parametersCount > 0 ? parametersCount : ps.getParameterMetaData().getParameterCount();
+
+                    // only populate if really needed
+                    if (alwaysPopulateStatement || expected > 0) {
+                        // transfer incoming message body data to prepared statement parameters, if necessary
+                        if (batch) {
+                            Iterator<?> iterator = exchange.getIn().getBody(Iterator.class);
+                            while (iterator != null && iterator.hasNext()) {
+                                Object value = iterator.next();
+                                Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, exchange, value);
+                                sqlPrepareStatementStrategy.populateStatement(ps, i, expected);
+                                ps.addBatch();
+                            }
+                        } else {
+                            Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, exchange, exchange.getIn().getBody());
                             sqlPrepareStatementStrategy.populateStatement(ps, i, expected);
-                            ps.addBatch();
                         }
-                    } else {
-                        Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, exchange, exchange.getIn().getBody());
-                        sqlPrepareStatementStrategy.populateStatement(ps, i, expected);
                     }
-                }
 
-                boolean isResultSet = false;
+                    boolean isResultSet = false;
 
-                // execute the prepared statement and populate the outgoing message
-                if (batch) {
-                    int[] updateCounts = ps.executeBatch();
-                    int total = 0;
-                    for (int count : updateCounts) {
-                        total += count;
-                    }
-                    exchange.getIn().setHeader(SqlConstants.SQL_UPDATE_COUNT, total);
-                } else {
-                    isResultSet = ps.execute();
-                    if (isResultSet) {
-                        // preserve headers first, so we can override the SQL_ROW_COUNT header
-                        exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
-
-                        ResultSet rs = ps.getResultSet();
-                        SqlOutputType outputType = getEndpoint().getOutputType();
-                        log.trace("Got result list from query: {}, outputType={}", rs, outputType);
-                        if (outputType == SqlOutputType.SelectList) {
-                            List<?> data = getEndpoint().queryForList(rs, true);
-                            // for noop=true we still want to enrich with the row count header
-                            if (getEndpoint().isNoop()) {
-                                exchange.getOut().setBody(exchange.getIn().getBody());
-                            } else if (getEndpoint().getOutputHeader() != null) {
-                                exchange.getOut().setBody(exchange.getIn().getBody());
-                                exchange.getOut().setHeader(getEndpoint().getOutputHeader(), data);
-                            } else {
-                                exchange.getOut().setBody(data);
-                            }
-                            exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, data.size());
-                        } else if (outputType == SqlOutputType.SelectOne) {
-                            Object data = getEndpoint().queryForObject(rs);
-                            if (data != null) {
+                    // execute the prepared statement and populate the outgoing message
+                    if (batch) {
+                        int[] updateCounts = ps.executeBatch();
+                        int total = 0;
+                        for (int count : updateCounts) {
+                            total += count;
+                        }
+                        exchange.getIn().setHeader(SqlConstants.SQL_UPDATE_COUNT, total);
+                    } else {
+                        isResultSet = ps.execute();
+                        if (isResultSet) {
+                            // preserve headers first, so we can override the SQL_ROW_COUNT header
+                            exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
+
+                            rs = ps.getResultSet();
+                            SqlOutputType outputType = getEndpoint().getOutputType();
+                            log.trace("Got result list from query: {}, outputType={}", rs, outputType);
+                            if (outputType == SqlOutputType.SelectList) {
+                                List<?> data = getEndpoint().queryForList(rs, true);
                                 // for noop=true we still want to enrich with the row count header
                                 if (getEndpoint().isNoop()) {
                                     exchange.getOut().setBody(exchange.getIn().getBody());
@@ -151,35 +142,51 @@ public class SqlProducer extends DefaultProducer {
                                 } else {
                                     exchange.getOut().setBody(data);
                                 }
-                                exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, 1);
+                                exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, data.size());
+                            } else if (outputType == SqlOutputType.SelectOne) {
+                                Object data = getEndpoint().queryForObject(rs);
+                                if (data != null) {
+                                    // for noop=true we still want to enrich with the row count header
+                                    if (getEndpoint().isNoop()) {
+                                        exchange.getOut().setBody(exchange.getIn().getBody());
+                                    } else if (getEndpoint().getOutputHeader() != null) {
+                                        exchange.getOut().setBody(exchange.getIn().getBody());
+                                        exchange.getOut().setHeader(getEndpoint().getOutputHeader(), data);
+                                    } else {
+                                        exchange.getOut().setBody(data);
+                                    }
+                                    exchange.getOut().setHeader(SqlConstants.SQL_ROW_COUNT, 1);
+                                }
+                            } else {
+                                throw new IllegalArgumentException("Invalid outputType=" + outputType);
                             }
                         } else {
-                            throw new IllegalArgumentException("Invalid outputType=" + outputType);
+                            exchange.getIn().setHeader(SqlConstants.SQL_UPDATE_COUNT, ps.getUpdateCount());
                         }
-                    } else {
-                        exchange.getIn().setHeader(SqlConstants.SQL_UPDATE_COUNT, ps.getUpdateCount());
                     }
-                }
 
-                if (shouldRetrieveGeneratedKeys) {
-                    // if no OUT message yet then create one and propagate headers
-                    if (!exchange.hasOut()) {
-                        exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
-                    }
+                    if (shouldRetrieveGeneratedKeys) {
+                        // if no OUT message yet then create one and propagate headers
+                        if (!exchange.hasOut()) {
+                            exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
+                        }
 
-                    if (isResultSet) {
-                        // we won't return generated keys for SELECT statements
-                        exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_DATA, Collections.EMPTY_LIST);
-                        exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_ROW_COUNT, 0);
-                    } else {
-                        List<?> generatedKeys = getEndpoint().queryForList(ps.getGeneratedKeys(), false);
-                        exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_DATA, generatedKeys);
-                        exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_ROW_COUNT, generatedKeys.size());
+                        if (isResultSet) {
+                            // we won't return generated keys for SELECT statements
+                            exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_DATA, Collections.EMPTY_LIST);
+                            exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_ROW_COUNT, 0);
+                        } else {
+                            List<?> generatedKeys = getEndpoint().queryForList(ps.getGeneratedKeys(), false);
+                            exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_DATA, generatedKeys);
+                            exchange.getOut().setHeader(SqlConstants.SQL_GENERATED_KEYS_ROW_COUNT, generatedKeys.size());
+                        }
                     }
-                }
 
-                // data is set on exchange so return null
-                return null;
+                    // data is set on exchange so return null
+                    return null;
+                } finally {
+                    closeResultSet(rs);
+                }
             }
         });
     }