You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/30 10:48:33 UTC
camel git commit: CAMEL-9849: camel-sql - Add support for
OutputType=StreamList in the producer
Repository: camel
Updated Branches:
refs/heads/master 32417731b -> dda7ca31d
CAMEL-9849: camel-sql - Add support for OutputType=StreamList in the producer
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dda7ca31
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dda7ca31
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dda7ca31
Branch: refs/heads/master
Commit: dda7ca31d822315f574f588ab1fdd7fccf0562ca
Parents: 3241773
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Apr 30 10:48:24 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Apr 30 10:48:24 2016 +0200
----------------------------------------------------------------------
.../camel/component/sql/DefaultSqlEndpoint.java | 18 ++-
.../camel/component/sql/ResultSetIterator.java | 114 +++++++++++++++++
.../sql/ResultSetIteratorCompletion.java | 35 ++++++
.../camel/component/sql/SqlOutputType.java | 2 +-
.../apache/camel/component/sql/SqlProducer.java | 79 +++++++++++-
.../camel/component/sql/ProjectModel.java | 5 +
.../SqlProducerOutputTypeStreamListTest.java | 123 +++++++++++++++++++
7 files changed, 373 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/dda7ca31/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
index 8a13a74..abcfc95 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
@@ -16,9 +16,11 @@
*/
package org.apache.camel.component.sql;
+import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLDataException;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
@@ -92,7 +94,9 @@ public abstract class DefaultSqlEndpoint extends DefaultPollingEndpoint {
+ "b) If the query has more than one column, then it will return a Map of that result."
+ "c) If the outputClass is set, then it will convert the query result into an Java bean object by calling all the setters that match the column names."
+ "It will assume your class has a default constructor to create instance with."
- + "d) If the query resulted in more than one rows, it throws an non-unique result exception.")
+ + "d) If the query resulted in more than one rows, it throws an non-unique result exception."
+ + "StreamList can only be used by the producer that streams the result of the query using an Iterator. This can be used with the Splitter EIP in streaming"
+ + " mode to process the ResultSet in streaming fashion.")
private SqlOutputType outputType = SqlOutputType.SelectList;
@UriParam(description = "Specify the full package and class name to use as conversion when outputType=SelectOne.")
private String outputClass;
@@ -494,4 +498,16 @@ public abstract class DefaultSqlEndpoint extends DefaultPollingEndpoint {
return result;
}
+ @SuppressWarnings("unchecked")
+ public ResultSetIterator queryForStreamList(Connection connection, Statement statement, ResultSet rs) throws SQLException {
+ if (outputClass == null) {
+ RowMapper rowMapper = new ColumnMapRowMapper();
+ return new ResultSetIterator(connection, statement, rs, rowMapper);
+ } else {
+ Class<?> outputClzz = getCamelContext().getClassResolver().resolveClass(outputClass);
+ RowMapper rowMapper = new BeanPropertyRowMapper(outputClzz);
+ return new ResultSetIterator(connection, statement, rs, rowMapper);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/dda7ca31/components/camel-sql/src/main/java/org/apache/camel/component/sql/ResultSetIterator.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/ResultSetIterator.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/ResultSetIterator.java
new file mode 100644
index 0000000..837735d
--- /dev/null
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/ResultSetIterator.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import java.io.Closeable;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.camel.RuntimeCamelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jdbc.core.RowMapper;
+
+public class ResultSetIterator implements Iterator, Closeable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ResultSetIterator.class);
+
+ private final Connection connection;
+ private final Statement statement;
+ private final ResultSet resultSet;
+ private final RowMapper rowMapper;
+ private final AtomicBoolean closed = new AtomicBoolean();
+ private int rowNum;
+
+ public ResultSetIterator(Connection connection, Statement statement, ResultSet resultSet, RowMapper rowMapper) throws SQLException {
+ this.connection = connection;
+ this.statement = statement;
+ this.resultSet = resultSet;
+ this.rowMapper = rowMapper;
+
+ loadNext();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !closed.get();
+ }
+
+ @Override
+ public Object next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ try {
+ Object next = rowMapper.mapRow(resultSet, rowNum++);
+ loadNext();
+ return next;
+ } catch (SQLException e) {
+ close();
+ throw new RuntimeCamelException("Cannot process result", e);
+ }
+ }
+
+ private void loadNext() throws SQLException {
+ boolean hasNext = resultSet.next();
+ if (!hasNext) {
+ close();
+ }
+ }
+
+ @Override
+ public void close() {
+ if (closed.compareAndSet(false, true)) {
+ safeCloseResultSet();
+ safeCloseStatement();
+ safeCloseConnection();
+ }
+ }
+
+ private void safeCloseResultSet() {
+ try {
+ resultSet.close();
+ } catch (SQLException e) {
+ LOG.warn("Error by closing result set: " + e, e);
+ }
+ }
+
+ private void safeCloseStatement() {
+ try {
+ statement.close();
+ } catch (SQLException e) {
+ LOG.warn("Error by closing statement: " + e, e);
+ }
+ }
+
+ private void safeCloseConnection() {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ LOG.warn("Error by closing connection: " + e, e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/dda7ca31/components/camel-sql/src/main/java/org/apache/camel/component/sql/ResultSetIteratorCompletion.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/ResultSetIteratorCompletion.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/ResultSetIteratorCompletion.java
new file mode 100644
index 0000000..59625d3
--- /dev/null
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/ResultSetIteratorCompletion.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.support.SynchronizationAdapter;
+
+public class ResultSetIteratorCompletion extends SynchronizationAdapter {
+
+ private final ResultSetIterator iterator;
+
+ public ResultSetIteratorCompletion(ResultSetIterator iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public void onDone(Exchange exchange) {
+ iterator.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/dda7ca31/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlOutputType.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlOutputType.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlOutputType.java
index 86a1bf6..216a908 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlOutputType.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlOutputType.java
@@ -18,5 +18,5 @@ package org.apache.camel.component.sql;
public enum SqlOutputType {
- SelectOne, SelectList
+ SelectOne, SelectList, StreamList
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/dda7ca31/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
index da5f065..bf44374 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
@@ -32,7 +32,9 @@ import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementCallback;
import org.springframework.jdbc.core.PreparedStatementCreator;
+import static org.springframework.jdbc.support.JdbcUtils.closeConnection;
import static org.springframework.jdbc.support.JdbcUtils.closeResultSet;
+import static org.springframework.jdbc.support.JdbcUtils.closeStatement;
public class SqlProducer extends DefaultProducer {
private final String query;
@@ -78,7 +80,6 @@ public class SqlProducer extends DefaultProducer {
}
final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(sql, getEndpoint().isAllowNamedParameters(), exchange);
- // CAMEL-7313 - check whether to return generated keys
final Boolean shouldRetrieveGeneratedKeys =
exchange.getIn().getHeader(SqlConstants.SQL_RETRIEVE_GENERATED_KEYS, false, Boolean.class);
@@ -104,6 +105,14 @@ public class SqlProducer extends DefaultProducer {
}
};
+ // special for processing stream list (batch not supported)
+ SqlOutputType outputType = getEndpoint().getOutputType();
+ if (outputType == SqlOutputType.StreamList) {
+ processStreamList(exchange, statementCreator, sql, preparedQuery);
+ return;
+ }
+
+ log.trace("jdbcTemplate.execute: {}", preparedQuery);
jdbcTemplate.execute(statementCreator, new PreparedStatementCallback<Map<?, ?>>() {
public Map<?, ?> doInPreparedStatement(PreparedStatement ps) throws SQLException {
ResultSet rs = null;
@@ -224,6 +233,74 @@ public class SqlProducer extends DefaultProducer {
});
}
+ protected void processStreamList(Exchange exchange, PreparedStatementCreator statementCreator, String sql, String preparedQuery) throws Exception {
+ log.trace("processStreamList: {}", preparedQuery);
+
+ // do not use the jdbcTemplate as it will auto-close connection/ps/rs when exiting the execute method
+ // and we need to keep the connection alive while routing and close it when the Exchange is done being routed
+ Connection con = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+
+ try {
+ con = jdbcTemplate.getDataSource().getConnection();
+ ps = statementCreator.createPreparedStatement(con);
+
+ int expected = parametersCount > 0 ? parametersCount : ps.getParameterMetaData().getParameterCount();
+
+ // only populate if really needed
+ if (alwaysPopulateStatement || expected > 0) {
+ // transfer incoming message body data to prepared statement parameters, if necessary
+ if (batch) {
+ Iterator<?> iterator;
+ if (useMessageBodyForSql) {
+ iterator = exchange.getIn().getHeader(SqlConstants.SQL_PARAMETERS, Iterator.class);
+ } else {
+ iterator = exchange.getIn().getBody(Iterator.class);
+ }
+ while (iterator != null && iterator.hasNext()) {
+ Object value = iterator.next();
+ Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, exchange, value);
+ sqlPrepareStatementStrategy.populateStatement(ps, i, expected);
+ ps.addBatch();
+ }
+ } else {
+ Object value;
+ if (useMessageBodyForSql) {
+ value = exchange.getIn().getHeader(SqlConstants.SQL_PARAMETERS);
+ } else {
+ value = exchange.getIn().getBody();
+ }
+ Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, exchange, value);
+ sqlPrepareStatementStrategy.populateStatement(ps, i, expected);
+ }
+ }
+
+ boolean isResultSet = ps.execute();
+ if (isResultSet) {
+ rs = ps.getResultSet();
+ ResultSetIterator iterator = getEndpoint().queryForStreamList(con, ps, rs);
+ if (getEndpoint().isNoop()) {
+ exchange.getOut().setBody(exchange.getIn().getBody());
+ } else if (getEndpoint().getOutputHeader() != null) {
+ exchange.getOut().setBody(exchange.getIn().getBody());
+ exchange.getOut().setHeader(getEndpoint().getOutputHeader(), iterator);
+ } else {
+ exchange.getOut().setBody(iterator);
+ }
+ // we do not know the row count so we cannot set a ROW_COUNT header
+ // defer closing the iterator when the exchange is complete
+ exchange.addOnCompletion(new ResultSetIteratorCompletion(iterator));
+ }
+ } catch (Exception e) {
+ // in case of exception then close all this before rethrow
+ closeConnection(con);
+ closeStatement(ps);
+ closeResultSet(rs);
+ throw e;
+ }
+ }
+
public void setParametersCount(int parametersCount) {
this.parametersCount = parametersCount;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/dda7ca31/components/camel-sql/src/test/java/org/apache/camel/component/sql/ProjectModel.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/test/java/org/apache/camel/component/sql/ProjectModel.java b/components/camel-sql/src/test/java/org/apache/camel/component/sql/ProjectModel.java
index 2160800..f7d01d6 100644
--- a/components/camel-sql/src/test/java/org/apache/camel/component/sql/ProjectModel.java
+++ b/components/camel-sql/src/test/java/org/apache/camel/component/sql/ProjectModel.java
@@ -45,4 +45,9 @@ public class ProjectModel {
public String getLicense() {
return license;
}
+
+ @Override
+ public String toString() {
+ return "ProjectModel{id=" + id + ", project='" + project + '\'' + ", license='" + license + '\'' + '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/dda7ca31/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerOutputTypeStreamListTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerOutputTypeStreamListTest.java b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerOutputTypeStreamListTest.java
new file mode 100644
index 0000000..a0a4034
--- /dev/null
+++ b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerOutputTypeStreamListTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+
+public class SqlProducerOutputTypeStreamListTest extends CamelTestSupport {
+
+ private EmbeddedDatabase db;
+
+ @Before
+ public void setUp() throws Exception {
+ db = new EmbeddedDatabaseBuilder()
+ .setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
+
+ super.setUp();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+
+ db.shutdown();
+ }
+
+ @Test
+ public void testReturnAnIterator() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+
+ template.sendBody("direct:start", "testmsg");
+
+ mock.assertIsSatisfied();
+ assertThat(resultBodyAt(mock, 0), instanceOf(Iterator.class));
+ }
+
+ @Test
+ public void testSplit() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(3);
+
+ template.sendBody("direct:withSplit", "testmsg");
+
+ mock.assertIsSatisfied();
+ assertThat(resultBodyAt(mock, 0), instanceOf(Map.class));
+ assertThat(resultBodyAt(mock, 1), instanceOf(Map.class));
+ assertThat(resultBodyAt(mock, 2), instanceOf(Map.class));
+ }
+
+ @Test
+ public void testSplitWithModel() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(3);
+
+ template.sendBody("direct:withSplitModel", "testmsg");
+
+ mock.assertIsSatisfied();
+ assertThat(resultBodyAt(mock, 0), instanceOf(ProjectModel.class));
+ assertThat(resultBodyAt(mock, 1), instanceOf(ProjectModel.class));
+ assertThat(resultBodyAt(mock, 2), instanceOf(ProjectModel.class));
+ }
+
+ private Object resultBodyAt(MockEndpoint result, int index) {
+ return result.assertExchangeReceived(index).getIn().getBody();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+ getContext().getComponent("sql", SqlComponent.class).setDataSource(db);
+
+ from("direct:start")
+ .to("sql:select * from projects order by id?outputType=StreamList")
+ .to("log:stream")
+ .to("mock:result");
+
+ from("direct:withSplit")
+ .to("sql:select * from projects order by id?outputType=StreamList")
+ .to("log:stream")
+ .split(body()).streaming()
+ .to("log:row")
+ .to("mock:result")
+ .end();
+
+ from("direct:withSplitModel")
+ .to("sql:select * from projects order by id?outputType=StreamList&outputClass=org.apache.camel.component.sql.ProjectModel")
+ .to("log:stream")
+ .split(body()).streaming()
+ .to("log:row")
+ .to("mock:result")
+ .end();
+ }
+ };
+ }
+}