You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/30 11:19:37 UTC
camel git commit: CAMEL-9849: camel-elsql - Add support for
OutputType=StreamList in the producer
Repository: camel
Updated Branches:
refs/heads/master 893d25645 -> 88bed23b0
CAMEL-9849: camel-elsql - Add support for OutputType=StreamList in the producer
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/88bed23b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/88bed23b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/88bed23b
Branch: refs/heads/master
Commit: 88bed23b0900234658cca32d7bdc940e965e8a79
Parents: 893d256
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Apr 30 11:19:00 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Apr 30 11:19:00 2016 +0200
----------------------------------------------------------------------
.../camel/component/elsql/ElsqlComponent.java | 2 +-
.../camel/component/elsql/ElsqlEndpoint.java | 8 +-
.../camel/component/elsql/ElsqlProducer.java | 75 ++++++++++-
.../elsql/ElSqlProducerStreamListTest.java | 128 +++++++++++++++++++
.../apache/camel/component/elsql/Project.java | 6 +
5 files changed, 215 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/88bed23b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlComponent.java b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlComponent.java
index 145dd8e..b328a72 100644
--- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlComponent.java
+++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlComponent.java
@@ -84,7 +84,7 @@ public class ElsqlComponent extends UriEndpointComponent {
onConsumeBatchComplete = getAndRemoveParameter(parameters, "onConsumeBatchComplete", String.class);
}
- ElsqlEndpoint endpoint = new ElsqlEndpoint(uri, this, jdbcTemplate, elsqlName, resUri);
+ ElsqlEndpoint endpoint = new ElsqlEndpoint(uri, this, jdbcTemplate, target, elsqlName, resUri);
endpoint.setElSqlConfig(elSqlConfig);
endpoint.setDatabaseVendor(databaseVendor);
endpoint.setDataSource(ds);
http://git-wip-us.apache.org/repos/asf/camel/blob/88bed23b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
index ab09ab0..8d67073 100644
--- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
+++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.elsql;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
+import javax.sql.DataSource;
import com.opengamma.elsql.ElSql;
import com.opengamma.elsql.ElSqlConfig;
@@ -52,6 +53,7 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint {
private ElSql elSql;
private NamedParameterJdbcTemplate namedJdbcTemplate;
+ private DataSource dataSource;
@UriPath
@Metadata(required = "true")
@@ -63,11 +65,13 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint {
@UriParam
private ElSqlConfig elSqlConfig;
- public ElsqlEndpoint(String uri, Component component, NamedParameterJdbcTemplate namedJdbcTemplate, String elsqlName, String resourceUri) {
+ public ElsqlEndpoint(String uri, Component component, NamedParameterJdbcTemplate namedJdbcTemplate, DataSource dataSource,
+ String elsqlName, String resourceUri) {
super(uri, component, null);
this.elsqlName = elsqlName;
this.resourceUri = resourceUri;
this.namedJdbcTemplate = namedJdbcTemplate;
+ this.dataSource = dataSource;
}
@Override
@@ -94,7 +98,7 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint {
@Override
public Producer createProducer() throws Exception {
- ElsqlProducer result = new ElsqlProducer(this, elSql, elsqlName, namedJdbcTemplate);
+ ElsqlProducer result = new ElsqlProducer(this, elSql, elsqlName, namedJdbcTemplate, dataSource);
return result;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/88bed23b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
index 6cd236b..e2783b5 100644
--- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
+++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
@@ -16,14 +16,18 @@
*/
package org.apache.camel.component.elsql;
+import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
+import javax.sql.DataSource;
import com.opengamma.elsql.ElSql;
import com.opengamma.elsql.SpringSqlParams;
import org.apache.camel.Exchange;
+import org.apache.camel.component.sql.ResultSetIterator;
+import org.apache.camel.component.sql.ResultSetIteratorCompletion;
import org.apache.camel.component.sql.SqlConstants;
import org.apache.camel.component.sql.SqlOutputType;
import org.apache.camel.impl.DefaultProducer;
@@ -31,10 +35,17 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.PreparedStatementCallback;
+import org.springframework.jdbc.core.PreparedStatementCreator;
+import org.springframework.jdbc.core.PreparedStatementCreatorFactory;
+import org.springframework.jdbc.core.SqlParameter;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.jdbc.core.namedparam.NamedParameterUtils;
+import org.springframework.jdbc.core.namedparam.ParsedSql;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
+import static org.springframework.jdbc.support.JdbcUtils.closeConnection;
import static org.springframework.jdbc.support.JdbcUtils.closeResultSet;
+import static org.springframework.jdbc.support.JdbcUtils.closeStatement;
public class ElsqlProducer extends DefaultProducer {
@@ -42,12 +53,14 @@ public class ElsqlProducer extends DefaultProducer {
private final ElSql elSql;
private final String elSqlName;
private final NamedParameterJdbcTemplate jdbcTemplate;
+ private final DataSource dataSource;
- public ElsqlProducer(ElsqlEndpoint endpoint, ElSql elSql, String elSqlName, NamedParameterJdbcTemplate jdbcTemplate) {
+ public ElsqlProducer(ElsqlEndpoint endpoint, ElSql elSql, String elSqlName, NamedParameterJdbcTemplate jdbcTemplate, DataSource dataSource) {
super(endpoint);
this.elSql = elSql;
this.elSqlName = elSqlName;
this.jdbcTemplate = jdbcTemplate;
+ this.dataSource = dataSource;
}
@Override
@@ -63,6 +76,14 @@ public class ElsqlProducer extends DefaultProducer {
final String sql = elSql.getSql(elSqlName, new SpringSqlParams(param));
LOG.debug("ElsqlProducer @{} using sql: {}", elSqlName, sql);
+ // special for processing stream list (batch not supported)
+ SqlOutputType outputType = getEndpoint().getOutputType();
+ if (outputType == SqlOutputType.StreamList) {
+ processStreamList(exchange, sql, param);
+ return;
+ }
+
+ log.trace("jdbcTemplate.execute: {}", sql);
jdbcTemplate.execute(sql, param, new PreparedStatementCallback<Object>() {
@Override
public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException {
@@ -122,4 +143,56 @@ public class ElsqlProducer extends DefaultProducer {
}
});
}
+
+ protected void processStreamList(Exchange exchange, String sql, SqlParameterSource param) throws Exception {
+ // spring JDBC to parse the SQL and build the prepared statement creator
+ // this is what NamedJdbcTemplate does internally
+ ParsedSql parsedSql = NamedParameterUtils.parseSqlStatement(sql);
+ String sqlToUse = NamedParameterUtils.substituteNamedParameters(parsedSql, param);
+ Object[] params = NamedParameterUtils.buildValueArray(parsedSql, param, null);
+ List<SqlParameter> declaredParameters = NamedParameterUtils.buildSqlParameterList(parsedSql, param);
+ PreparedStatementCreatorFactory pscf = new PreparedStatementCreatorFactory(sqlToUse, declaredParameters);
+ PreparedStatementCreator statementCreator = pscf.newPreparedStatementCreator(params);
+
+ processStreamList(exchange, statementCreator, sqlToUse);
+ }
+
+ protected void processStreamList(Exchange exchange, PreparedStatementCreator statementCreator, String preparedQuery) throws Exception {
+ log.trace("processStreamList: {}", preparedQuery);
+
+ // do not use the jdbcTemplate as it will auto-close connection/ps/rs when exiting the execute method
+ // and we need to keep the connection alive while routing and close it when the Exchange is done being routed
+ Connection con = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+
+ try {
+ con = dataSource.getConnection();
+ ps = statementCreator.createPreparedStatement(con);
+
+ boolean isResultSet = ps.execute();
+ if (isResultSet) {
+ rs = ps.getResultSet();
+ ResultSetIterator iterator = getEndpoint().queryForStreamList(con, ps, rs);
+ if (getEndpoint().isNoop()) {
+ exchange.getOut().setBody(exchange.getIn().getBody());
+ } else if (getEndpoint().getOutputHeader() != null) {
+ exchange.getOut().setBody(exchange.getIn().getBody());
+ exchange.getOut().setHeader(getEndpoint().getOutputHeader(), iterator);
+ } else {
+ exchange.getOut().setBody(iterator);
+ }
+ // we do not know the row count so we cannot set a ROW_COUNT header
+ // defer closing the iterator when the exchange is complete
+ exchange.addOnCompletion(new ResultSetIteratorCompletion(iterator));
+ }
+ } catch (Exception e) {
+ // in case of exception then close all this before rethrow
+ closeConnection(con);
+ closeStatement(ps);
+ closeResultSet(rs);
+ throw e;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/88bed23b/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlProducerStreamListTest.java
----------------------------------------------------------------------
diff --git a/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlProducerStreamListTest.java b/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlProducerStreamListTest.java
new file mode 100644
index 0000000..f1200c5
--- /dev/null
+++ b/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlProducerStreamListTest.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elsql;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Test;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+
+public class ElSqlProducerStreamListTest extends CamelTestSupport {
+
+ private EmbeddedDatabase db;
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry jndi = super.createRegistry();
+
+ // this is the database we create with some initial data for our unit test
+ db = new EmbeddedDatabaseBuilder()
+ .setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
+
+ jndi.bind("dataSource", db);
+
+ return jndi;
+ }
+
+ @Test
+ public void testReturnAnIterator() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+
+ template.sendBody("direct:start", "testmsg");
+
+ mock.assertIsSatisfied();
+ assertThat(resultBodyAt(mock, 0), instanceOf(Iterator.class));
+ }
+
+ @Test
+ public void testSplit() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(3);
+
+ template.sendBody("direct:withSplit", "testmsg");
+
+ mock.assertIsSatisfied();
+ assertThat(resultBodyAt(mock, 0), instanceOf(Map.class));
+ assertThat(resultBodyAt(mock, 1), instanceOf(Map.class));
+ assertThat(resultBodyAt(mock, 2), instanceOf(Map.class));
+ }
+
+ @Test
+ public void testSplitWithModel() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(3);
+
+ template.sendBody("direct:withSplitModel", "testmsg");
+
+ mock.assertIsSatisfied();
+ assertThat(resultBodyAt(mock, 0), instanceOf(Project.class));
+ assertThat(resultBodyAt(mock, 1), instanceOf(Project.class));
+ assertThat(resultBodyAt(mock, 2), instanceOf(Project.class));
+ }
+
+ private Object resultBodyAt(MockEndpoint result, int index) {
+ return result.assertExchangeReceived(index).getIn().getBody();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+
+ db.shutdown();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+ getContext().getComponent("elsql", ElsqlComponent.class).setDataSource(db);
+
+ from("direct:start")
+ .to("elsql:allProjects:elsql/projects.elsql?outputType=StreamList")
+ .to("log:stream")
+ .to("mock:result");
+
+ from("direct:withSplit")
+ .to("elsql:allProjects:elsql/projects.elsql?outputType=StreamList")
+ .to("log:stream")
+ .split(body()).streaming()
+ .to("log:row")
+ .to("mock:result")
+ .end();
+
+ from("direct:withSplitModel")
+ .to("elsql:allProjects:elsql/projects.elsql?outputType=StreamList&outputClass=org.apache.camel.component.elsql.Project")
+ .to("log:stream")
+ .split(body()).streaming()
+ .to("log:row")
+ .to("mock:result")
+ .end();
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/88bed23b/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/Project.java
----------------------------------------------------------------------
diff --git a/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/Project.java b/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/Project.java
index 9f6c19e..2610876 100644
--- a/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/Project.java
+++ b/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/Project.java
@@ -45,4 +45,10 @@ public class Project {
public void setName(String name) {
this.name = name;
}
+
+ @Override
+ public String toString() {
+ return "Project{id=" + id + ", name='" + name + '\'' + ", license='" + license + '\'' + '}';
+ }
+
}