You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/30 11:19:37 UTC

camel git commit: CAMEL-9849: camel-elsql - Add support for OutputType=StreamList in the producer

Repository: camel
Updated Branches:
  refs/heads/master 893d25645 -> 88bed23b0


CAMEL-9849: camel-elsql - Add support for OutputType=StreamList in the producer


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/88bed23b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/88bed23b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/88bed23b

Branch: refs/heads/master
Commit: 88bed23b0900234658cca32d7bdc940e965e8a79
Parents: 893d256
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Apr 30 11:19:00 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Apr 30 11:19:00 2016 +0200

----------------------------------------------------------------------
 .../camel/component/elsql/ElsqlComponent.java   |   2 +-
 .../camel/component/elsql/ElsqlEndpoint.java    |   8 +-
 .../camel/component/elsql/ElsqlProducer.java    |  75 ++++++++++-
 .../elsql/ElSqlProducerStreamListTest.java      | 128 +++++++++++++++++++
 .../apache/camel/component/elsql/Project.java   |   6 +
 5 files changed, 215 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/88bed23b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlComponent.java b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlComponent.java
index 145dd8e..b328a72 100644
--- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlComponent.java
+++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlComponent.java
@@ -84,7 +84,7 @@ public class ElsqlComponent extends UriEndpointComponent {
             onConsumeBatchComplete = getAndRemoveParameter(parameters, "onConsumeBatchComplete", String.class);
         }
 
-        ElsqlEndpoint endpoint = new ElsqlEndpoint(uri, this, jdbcTemplate, elsqlName, resUri);
+        ElsqlEndpoint endpoint = new ElsqlEndpoint(uri, this, jdbcTemplate, target, elsqlName, resUri);
         endpoint.setElSqlConfig(elSqlConfig);
         endpoint.setDatabaseVendor(databaseVendor);
         endpoint.setDataSource(ds);

http://git-wip-us.apache.org/repos/asf/camel/blob/88bed23b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
index ab09ab0..8d67073 100644
--- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
+++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.elsql;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
+import javax.sql.DataSource;
 
 import com.opengamma.elsql.ElSql;
 import com.opengamma.elsql.ElSqlConfig;
@@ -52,6 +53,7 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint {
 
     private ElSql elSql;
     private NamedParameterJdbcTemplate namedJdbcTemplate;
+    private DataSource dataSource;
 
     @UriPath
     @Metadata(required = "true")
@@ -63,11 +65,13 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint {
     @UriParam
     private ElSqlConfig elSqlConfig;
 
-    public ElsqlEndpoint(String uri, Component component, NamedParameterJdbcTemplate namedJdbcTemplate, String elsqlName, String resourceUri) {
+    public ElsqlEndpoint(String uri, Component component, NamedParameterJdbcTemplate namedJdbcTemplate, DataSource dataSource,
+                         String elsqlName, String resourceUri) {
         super(uri, component, null);
         this.elsqlName = elsqlName;
         this.resourceUri = resourceUri;
         this.namedJdbcTemplate = namedJdbcTemplate;
+        this.dataSource = dataSource;
     }
 
     @Override
@@ -94,7 +98,7 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint {
 
     @Override
     public Producer createProducer() throws Exception {
-        ElsqlProducer result = new ElsqlProducer(this, elSql, elsqlName, namedJdbcTemplate);
+        ElsqlProducer result = new ElsqlProducer(this, elSql, elsqlName, namedJdbcTemplate, dataSource);
         return result;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/88bed23b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
index 6cd236b..e2783b5 100644
--- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
+++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
@@ -16,14 +16,18 @@
  */
 package org.apache.camel.component.elsql;
 
+import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.List;
+import javax.sql.DataSource;
 
 import com.opengamma.elsql.ElSql;
 import com.opengamma.elsql.SpringSqlParams;
 import org.apache.camel.Exchange;
+import org.apache.camel.component.sql.ResultSetIterator;
+import org.apache.camel.component.sql.ResultSetIteratorCompletion;
 import org.apache.camel.component.sql.SqlConstants;
 import org.apache.camel.component.sql.SqlOutputType;
 import org.apache.camel.impl.DefaultProducer;
@@ -31,10 +35,17 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.dao.DataAccessException;
 import org.springframework.jdbc.core.PreparedStatementCallback;
+import org.springframework.jdbc.core.PreparedStatementCreator;
+import org.springframework.jdbc.core.PreparedStatementCreatorFactory;
+import org.springframework.jdbc.core.SqlParameter;
 import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.jdbc.core.namedparam.NamedParameterUtils;
+import org.springframework.jdbc.core.namedparam.ParsedSql;
 import org.springframework.jdbc.core.namedparam.SqlParameterSource;
 
+import static org.springframework.jdbc.support.JdbcUtils.closeConnection;
 import static org.springframework.jdbc.support.JdbcUtils.closeResultSet;
+import static org.springframework.jdbc.support.JdbcUtils.closeStatement;
 
 public class ElsqlProducer extends DefaultProducer {
 
@@ -42,12 +53,14 @@ public class ElsqlProducer extends DefaultProducer {
     private final ElSql elSql;
     private final String elSqlName;
     private final NamedParameterJdbcTemplate jdbcTemplate;
+    private final DataSource dataSource;
 
-    public ElsqlProducer(ElsqlEndpoint endpoint, ElSql elSql, String elSqlName, NamedParameterJdbcTemplate jdbcTemplate) {
+    public ElsqlProducer(ElsqlEndpoint endpoint, ElSql elSql, String elSqlName, NamedParameterJdbcTemplate jdbcTemplate, DataSource dataSource) {
         super(endpoint);
         this.elSql = elSql;
         this.elSqlName = elSqlName;
         this.jdbcTemplate = jdbcTemplate;
+        this.dataSource = dataSource;
     }
 
     @Override
@@ -63,6 +76,14 @@ public class ElsqlProducer extends DefaultProducer {
         final String sql = elSql.getSql(elSqlName, new SpringSqlParams(param));
         LOG.debug("ElsqlProducer @{} using sql: {}", elSqlName, sql);
 
+        // special for processing stream list (batch not supported)
+        SqlOutputType outputType = getEndpoint().getOutputType();
+        if (outputType == SqlOutputType.StreamList) {
+            processStreamList(exchange, sql, param);
+            return;
+        }
+
+        log.trace("jdbcTemplate.execute: {}", sql);
         jdbcTemplate.execute(sql, param, new PreparedStatementCallback<Object>() {
             @Override
             public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException {
@@ -122,4 +143,56 @@ public class ElsqlProducer extends DefaultProducer {
             }
         });
     }
+
+    protected void processStreamList(Exchange exchange, String sql, SqlParameterSource param) throws Exception {
+        // spring JDBC to parse the SQL and build the prepared statement creator
+        // this is what NamedJdbcTemplate does internally
+        ParsedSql parsedSql = NamedParameterUtils.parseSqlStatement(sql);
+        String sqlToUse = NamedParameterUtils.substituteNamedParameters(parsedSql, param);
+        Object[] params = NamedParameterUtils.buildValueArray(parsedSql, param, null);
+        List<SqlParameter> declaredParameters = NamedParameterUtils.buildSqlParameterList(parsedSql, param);
+        PreparedStatementCreatorFactory pscf = new PreparedStatementCreatorFactory(sqlToUse, declaredParameters);
+        PreparedStatementCreator statementCreator = pscf.newPreparedStatementCreator(params);
+
+        processStreamList(exchange, statementCreator, sqlToUse);
+    }
+
+    protected void processStreamList(Exchange exchange, PreparedStatementCreator statementCreator, String preparedQuery) throws Exception {
+        log.trace("processStreamList: {}", preparedQuery);
+
+        // do not use the jdbcTemplate as it will auto-close connection/ps/rs when exiting the execute method
+        // and we need to keep the connection alive while routing and close it when the Exchange is done being routed
+        Connection con = null;
+        PreparedStatement ps = null;
+        ResultSet rs = null;
+
+        try {
+            con = dataSource.getConnection();
+            ps = statementCreator.createPreparedStatement(con);
+
+            boolean isResultSet = ps.execute();
+            if (isResultSet) {
+                rs = ps.getResultSet();
+                ResultSetIterator iterator = getEndpoint().queryForStreamList(con, ps, rs);
+                if (getEndpoint().isNoop()) {
+                    exchange.getOut().setBody(exchange.getIn().getBody());
+                } else if (getEndpoint().getOutputHeader() != null) {
+                    exchange.getOut().setBody(exchange.getIn().getBody());
+                    exchange.getOut().setHeader(getEndpoint().getOutputHeader(), iterator);
+                } else {
+                    exchange.getOut().setBody(iterator);
+                }
+                // we do not know the row count so we cannot set a ROW_COUNT header
+                // defer closing the iterator when the exchange is complete
+                exchange.addOnCompletion(new ResultSetIteratorCompletion(iterator));
+            }
+        } catch (Exception e) {
+            // in case of exception then close all this before rethrow
+            closeConnection(con);
+            closeStatement(ps);
+            closeResultSet(rs);
+            throw e;
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/88bed23b/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlProducerStreamListTest.java
----------------------------------------------------------------------
diff --git a/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlProducerStreamListTest.java b/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlProducerStreamListTest.java
new file mode 100644
index 0000000..f1200c5
--- /dev/null
+++ b/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlProducerStreamListTest.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elsql;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Test;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+
+public class ElSqlProducerStreamListTest extends CamelTestSupport {
+
+    private EmbeddedDatabase db;
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+
+        // this is the database we create with some initial data for our unit test
+        db = new EmbeddedDatabaseBuilder()
+                .setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
+
+        jndi.bind("dataSource", db);
+
+        return jndi;
+    }
+
+    @Test
+    public void testReturnAnIterator() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+
+        template.sendBody("direct:start", "testmsg");
+
+        mock.assertIsSatisfied();
+        assertThat(resultBodyAt(mock, 0), instanceOf(Iterator.class));
+    }
+
+    @Test
+    public void testSplit() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(3);
+
+        template.sendBody("direct:withSplit", "testmsg");
+
+        mock.assertIsSatisfied();
+        assertThat(resultBodyAt(mock, 0), instanceOf(Map.class));
+        assertThat(resultBodyAt(mock, 1), instanceOf(Map.class));
+        assertThat(resultBodyAt(mock, 2), instanceOf(Map.class));
+    }
+
+    @Test
+    public void testSplitWithModel() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(3);
+
+        template.sendBody("direct:withSplitModel", "testmsg");
+
+        mock.assertIsSatisfied();
+        assertThat(resultBodyAt(mock, 0), instanceOf(Project.class));
+        assertThat(resultBodyAt(mock, 1), instanceOf(Project.class));
+        assertThat(resultBodyAt(mock, 2), instanceOf(Project.class));
+    }
+
+    private Object resultBodyAt(MockEndpoint result, int index) {
+        return result.assertExchangeReceived(index).getIn().getBody();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+
+        db.shutdown();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                getContext().getComponent("elsql", ElsqlComponent.class).setDataSource(db);
+
+                from("direct:start")
+                    .to("elsql:allProjects:elsql/projects.elsql?outputType=StreamList")
+                    .to("log:stream")
+                    .to("mock:result");
+
+                from("direct:withSplit")
+                    .to("elsql:allProjects:elsql/projects.elsql?outputType=StreamList")
+                    .to("log:stream")
+                    .split(body()).streaming()
+                        .to("log:row")
+                        .to("mock:result")
+                    .end();
+
+                from("direct:withSplitModel")
+                    .to("elsql:allProjects:elsql/projects.elsql?outputType=StreamList&outputClass=org.apache.camel.component.elsql.Project")
+                    .to("log:stream")
+                    .split(body()).streaming()
+                        .to("log:row")
+                        .to("mock:result")
+                    .end();
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/88bed23b/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/Project.java
----------------------------------------------------------------------
diff --git a/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/Project.java b/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/Project.java
index 9f6c19e..2610876 100644
--- a/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/Project.java
+++ b/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/Project.java
@@ -45,4 +45,10 @@ public class Project {
     public void setName(String name) {
         this.name = name;
     }
+
+    @Override
+    public String toString() {
+        return "Project{id=" + id + ", name='" + name + '\'' + ", license='" + license + '\'' + '}';
+    }
+
 }