You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/30 10:48:33 UTC

camel git commit: CAMEL-9849: camel-sql - Add support for OutputType=StreamList in the producer

Repository: camel
Updated Branches:
  refs/heads/master 32417731b -> dda7ca31d


CAMEL-9849: camel-sql - Add support for OutputType=StreamList in the producer


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dda7ca31
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dda7ca31
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dda7ca31

Branch: refs/heads/master
Commit: dda7ca31d822315f574f588ab1fdd7fccf0562ca
Parents: 3241773
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Apr 30 10:48:24 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Apr 30 10:48:24 2016 +0200

----------------------------------------------------------------------
 .../camel/component/sql/DefaultSqlEndpoint.java |  18 ++-
 .../camel/component/sql/ResultSetIterator.java  | 114 +++++++++++++++++
 .../sql/ResultSetIteratorCompletion.java        |  35 ++++++
 .../camel/component/sql/SqlOutputType.java      |   2 +-
 .../apache/camel/component/sql/SqlProducer.java |  79 +++++++++++-
 .../camel/component/sql/ProjectModel.java       |   5 +
 .../SqlProducerOutputTypeStreamListTest.java    | 123 +++++++++++++++++++
 7 files changed, 373 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/dda7ca31/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
index 8a13a74..abcfc95 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
@@ -16,9 +16,11 @@
  */
 package org.apache.camel.component.sql;
 
+import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLDataException;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.List;
 import java.util.Map;
 import javax.sql.DataSource;
@@ -92,7 +94,9 @@ public abstract class DefaultSqlEndpoint extends DefaultPollingEndpoint {
             + "b) If the query has more than one column, then it will return a Map of that result."
             + "c) If the outputClass is set, then it will convert the query result into an Java bean object by calling all the setters that match the column names."
             + "It will assume your class has a default constructor to create instance with."
-            + "d) If the query resulted in more than one rows, it throws an non-unique result exception.")
+            + "d) If the query resulted in more than one rows, it throws an non-unique result exception."
+            + "StreamList can only be used by the producer that streams the result of the query using an Iterator. This can be used with the Splitter EIP in streaming"
+            + " mode to process the ResultSet in streaming fashion.")
     private SqlOutputType outputType = SqlOutputType.SelectList;
     @UriParam(description = "Specify the full package and class name to use as conversion when outputType=SelectOne.")
     private String outputClass;
@@ -494,4 +498,16 @@ public abstract class DefaultSqlEndpoint extends DefaultPollingEndpoint {
         return result;
     }
 
+    @SuppressWarnings("unchecked")
+    public ResultSetIterator queryForStreamList(Connection connection, Statement statement, ResultSet rs) throws SQLException {
+        if (outputClass == null) {
+            RowMapper rowMapper = new ColumnMapRowMapper();
+            return new ResultSetIterator(connection, statement, rs, rowMapper);
+        } else {
+            Class<?> outputClzz = getCamelContext().getClassResolver().resolveClass(outputClass);
+            RowMapper rowMapper = new BeanPropertyRowMapper(outputClzz);
+            return new ResultSetIterator(connection, statement, rs, rowMapper);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/dda7ca31/components/camel-sql/src/main/java/org/apache/camel/component/sql/ResultSetIterator.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/ResultSetIterator.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/ResultSetIterator.java
new file mode 100644
index 0000000..837735d
--- /dev/null
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/ResultSetIterator.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import java.io.Closeable;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.camel.RuntimeCamelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jdbc.core.RowMapper;
+
+public class ResultSetIterator implements Iterator, Closeable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ResultSetIterator.class);
+
+    private final Connection connection;
+    private final Statement statement;
+    private final ResultSet resultSet;
+    private final RowMapper rowMapper;
+    private final AtomicBoolean closed = new AtomicBoolean();
+    private int rowNum;
+
+    public ResultSetIterator(Connection connection, Statement statement, ResultSet resultSet, RowMapper rowMapper) throws SQLException {
+        this.connection = connection;
+        this.statement = statement;
+        this.resultSet = resultSet;
+        this.rowMapper = rowMapper;
+
+        loadNext();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return !closed.get();
+    }
+
+    @Override
+    public Object next() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+
+        try {
+            Object next = rowMapper.mapRow(resultSet, rowNum++);
+            loadNext();
+            return next;
+        } catch (SQLException e) {
+            close();
+            throw new RuntimeCamelException("Cannot process result", e);
+        }
+    }
+
+    private void loadNext() throws SQLException {
+        boolean hasNext = resultSet.next();
+        if (!hasNext) {
+            close();
+        }
+    }
+
+    @Override
+    public void close() {
+        if (closed.compareAndSet(false, true)) {
+            safeCloseResultSet();
+            safeCloseStatement();
+            safeCloseConnection();
+        }
+    }
+
+    private void safeCloseResultSet() {
+        try {
+            resultSet.close();
+        } catch (SQLException e) {
+            LOG.warn("Error by closing result set: " + e, e);
+        }
+    }
+
+    private void safeCloseStatement() {
+        try {
+            statement.close();
+        } catch (SQLException e) {
+            LOG.warn("Error by closing statement: " + e, e);
+        }
+    }
+
+    private void safeCloseConnection() {
+        try {
+            connection.close();
+        } catch (SQLException e) {
+            LOG.warn("Error by closing connection: " + e, e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/dda7ca31/components/camel-sql/src/main/java/org/apache/camel/component/sql/ResultSetIteratorCompletion.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/ResultSetIteratorCompletion.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/ResultSetIteratorCompletion.java
new file mode 100644
index 0000000..59625d3
--- /dev/null
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/ResultSetIteratorCompletion.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.support.SynchronizationAdapter;
+
+public class ResultSetIteratorCompletion extends SynchronizationAdapter {
+
+    private final ResultSetIterator iterator;
+
+    public ResultSetIteratorCompletion(ResultSetIterator iterator) {
+        this.iterator = iterator;
+    }
+
+    @Override
+    public void onDone(Exchange exchange) {
+        iterator.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/dda7ca31/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlOutputType.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlOutputType.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlOutputType.java
index 86a1bf6..216a908 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlOutputType.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlOutputType.java
@@ -18,5 +18,5 @@ package org.apache.camel.component.sql;
 
 public enum SqlOutputType {
 
-    SelectOne, SelectList
+    SelectOne, SelectList, StreamList
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/dda7ca31/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
index da5f065..bf44374 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
@@ -32,7 +32,9 @@ import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.jdbc.core.PreparedStatementCallback;
 import org.springframework.jdbc.core.PreparedStatementCreator;
 
+import static org.springframework.jdbc.support.JdbcUtils.closeConnection;
 import static org.springframework.jdbc.support.JdbcUtils.closeResultSet;
+import static org.springframework.jdbc.support.JdbcUtils.closeStatement;
 
 public class SqlProducer extends DefaultProducer {
     private final String query;
@@ -78,7 +80,6 @@ public class SqlProducer extends DefaultProducer {
         }
         final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(sql, getEndpoint().isAllowNamedParameters(), exchange);
 
-        // CAMEL-7313 - check whether to return generated keys
         final Boolean shouldRetrieveGeneratedKeys =
             exchange.getIn().getHeader(SqlConstants.SQL_RETRIEVE_GENERATED_KEYS, false, Boolean.class);
 
@@ -104,6 +105,14 @@ public class SqlProducer extends DefaultProducer {
             }
         };
 
+        // special for processing stream list (batch not supported)
+        SqlOutputType outputType = getEndpoint().getOutputType();
+        if (outputType == SqlOutputType.StreamList) {
+            processStreamList(exchange, statementCreator, sql, preparedQuery);
+            return;
+        }
+
+        log.trace("jdbcTemplate.execute: {}", preparedQuery);
         jdbcTemplate.execute(statementCreator, new PreparedStatementCallback<Map<?, ?>>() {
             public Map<?, ?> doInPreparedStatement(PreparedStatement ps) throws SQLException {
                 ResultSet rs = null;
@@ -224,6 +233,74 @@ public class SqlProducer extends DefaultProducer {
         });
     }
 
+    protected void processStreamList(Exchange exchange, PreparedStatementCreator statementCreator, String sql, String preparedQuery) throws Exception {
+        log.trace("processStreamList: {}", preparedQuery);
+
+        // do not use the jdbcTemplate as it will auto-close connection/ps/rs when exiting the execute method
+        // and we need to keep the connection alive while routing and close it when the Exchange is done being routed
+        Connection con = null;
+        PreparedStatement ps = null;
+        ResultSet rs = null;
+
+        try {
+            con = jdbcTemplate.getDataSource().getConnection();
+            ps = statementCreator.createPreparedStatement(con);
+
+            int expected = parametersCount > 0 ? parametersCount : ps.getParameterMetaData().getParameterCount();
+
+            // only populate if really needed
+            if (alwaysPopulateStatement || expected > 0) {
+                // transfer incoming message body data to prepared statement parameters, if necessary
+                if (batch) {
+                    Iterator<?> iterator;
+                    if (useMessageBodyForSql) {
+                        iterator = exchange.getIn().getHeader(SqlConstants.SQL_PARAMETERS, Iterator.class);
+                    } else {
+                        iterator = exchange.getIn().getBody(Iterator.class);
+                    }
+                    while (iterator != null && iterator.hasNext()) {
+                        Object value = iterator.next();
+                        Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, exchange, value);
+                        sqlPrepareStatementStrategy.populateStatement(ps, i, expected);
+                        ps.addBatch();
+                    }
+                } else {
+                    Object value;
+                    if (useMessageBodyForSql) {
+                        value = exchange.getIn().getHeader(SqlConstants.SQL_PARAMETERS);
+                    } else {
+                        value = exchange.getIn().getBody();
+                    }
+                    Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, exchange, value);
+                    sqlPrepareStatementStrategy.populateStatement(ps, i, expected);
+                }
+            }
+
+            boolean isResultSet = ps.execute();
+            if (isResultSet) {
+                rs = ps.getResultSet();
+                ResultSetIterator iterator = getEndpoint().queryForStreamList(con, ps, rs);
+                if (getEndpoint().isNoop()) {
+                    exchange.getOut().setBody(exchange.getIn().getBody());
+                } else if (getEndpoint().getOutputHeader() != null) {
+                    exchange.getOut().setBody(exchange.getIn().getBody());
+                    exchange.getOut().setHeader(getEndpoint().getOutputHeader(), iterator);
+                } else {
+                    exchange.getOut().setBody(iterator);
+                }
+                // we do not know the row count so we cannot set a ROW_COUNT header
+                // defer closing the iterator when the exchange is complete
+                exchange.addOnCompletion(new ResultSetIteratorCompletion(iterator));
+            }
+        } catch (Exception e) {
+            // in case of exception then close all this before rethrow
+            closeConnection(con);
+            closeStatement(ps);
+            closeResultSet(rs);
+            throw e;
+        }
+    }
+
     public void setParametersCount(int parametersCount) {
         this.parametersCount = parametersCount;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/dda7ca31/components/camel-sql/src/test/java/org/apache/camel/component/sql/ProjectModel.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/test/java/org/apache/camel/component/sql/ProjectModel.java b/components/camel-sql/src/test/java/org/apache/camel/component/sql/ProjectModel.java
index 2160800..f7d01d6 100644
--- a/components/camel-sql/src/test/java/org/apache/camel/component/sql/ProjectModel.java
+++ b/components/camel-sql/src/test/java/org/apache/camel/component/sql/ProjectModel.java
@@ -45,4 +45,9 @@ public class ProjectModel {
     public String getLicense() {
         return license;
     }
+
+    @Override
+    public String toString() {
+        return "ProjectModel{id=" + id + ", project='" + project + '\'' + ", license='" + license + '\'' + '}';
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/dda7ca31/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerOutputTypeStreamListTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerOutputTypeStreamListTest.java b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerOutputTypeStreamListTest.java
new file mode 100644
index 0000000..a0a4034
--- /dev/null
+++ b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerOutputTypeStreamListTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+
+public class SqlProducerOutputTypeStreamListTest extends CamelTestSupport {
+
+    private EmbeddedDatabase db;
+
+    @Before
+    public void setUp() throws Exception {
+        db = new EmbeddedDatabaseBuilder()
+                .setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
+
+        super.setUp();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+
+        db.shutdown();
+    }
+
+    @Test
+    public void testReturnAnIterator() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+
+        template.sendBody("direct:start", "testmsg");
+
+        mock.assertIsSatisfied();
+        assertThat(resultBodyAt(mock, 0), instanceOf(Iterator.class));
+    }
+
+    @Test
+    public void testSplit() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(3);
+
+        template.sendBody("direct:withSplit", "testmsg");
+
+        mock.assertIsSatisfied();
+        assertThat(resultBodyAt(mock, 0), instanceOf(Map.class));
+        assertThat(resultBodyAt(mock, 1), instanceOf(Map.class));
+        assertThat(resultBodyAt(mock, 2), instanceOf(Map.class));
+    }
+
+    @Test
+    public void testSplitWithModel() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(3);
+
+        template.sendBody("direct:withSplitModel", "testmsg");
+
+        mock.assertIsSatisfied();
+        assertThat(resultBodyAt(mock, 0), instanceOf(ProjectModel.class));
+        assertThat(resultBodyAt(mock, 1), instanceOf(ProjectModel.class));
+        assertThat(resultBodyAt(mock, 2), instanceOf(ProjectModel.class));
+    }
+
+    private Object resultBodyAt(MockEndpoint result, int index) {
+        return result.assertExchangeReceived(index).getIn().getBody();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                getContext().getComponent("sql", SqlComponent.class).setDataSource(db);
+
+                from("direct:start")
+                        .to("sql:select * from projects order by id?outputType=StreamList")
+                        .to("log:stream")
+                        .to("mock:result");
+
+                from("direct:withSplit")
+                        .to("sql:select * from projects order by id?outputType=StreamList")
+                        .to("log:stream")
+                        .split(body()).streaming()
+                            .to("log:row")
+                            .to("mock:result")
+                        .end();
+
+                from("direct:withSplitModel")
+                        .to("sql:select * from projects order by id?outputType=StreamList&outputClass=org.apache.camel.component.sql.ProjectModel")
+                        .to("log:stream")
+                        .split(body()).streaming()
+                            .to("log:row")
+                            .to("mock:result")
+                        .end();
+            }
+        };
+    }
+}