You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/10/07 03:48:32 UTC

[camel] branch camel-2.24.x updated: CAMEL-14035: JDBC StreamList and outputClass does not work

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.24.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.24.x by this push:
     new ab643e0  CAMEL-14035: JDBC StreamList and outputClass does not work
ab643e0 is described below

commit ab643e0b55fb5780dab84c31718aae3bf8043ef2
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Oct 7 05:43:42 2019 +0200

    CAMEL-14035: JDBC StreamList and outputClass does not work
---
 .../apache/camel/component/jdbc/JdbcHelper.java    | 58 +++++++++++++++++
 .../apache/camel/component/jdbc/JdbcProducer.java  | 33 +---------
 .../camel/component/jdbc/StreamListIterator.java   | 73 ++++++++++++++++++++++
 ...roducerOutputTypeStreamListOutputClassTest.java | 69 ++++++++++++++++++++
 4 files changed, 203 insertions(+), 30 deletions(-)

diff --git a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcHelper.java b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcHelper.java
new file mode 100644
index 0000000..c6468ba
--- /dev/null
+++ b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcHelper.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jdbc;
+
+import java.sql.SQLException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.util.IntrospectionSupport;
+
+public final class JdbcHelper {
+
+    private JdbcHelper() {
+    }
+
+    public static Object newBeanInstance(CamelContext camelContext, String outputClass,
+                                         BeanRowMapper beanRowMapper, Map<String, Object> row) throws SQLException {
+        Class<?> clazz = camelContext.getClassResolver().resolveClass(outputClass);
+        Object answer = camelContext.getInjector().newInstance(clazz);
+
+        Map<String, Object> properties = new LinkedHashMap<>();
+
+        // map row names using the bean row mapper
+        for (Map.Entry<String, Object> entry : row.entrySet()) {
+            Object value = entry.getValue();
+            String name = beanRowMapper.map(entry.getKey(), value);
+            properties.put(name, value);
+        }
+        try {
+            IntrospectionSupport.setProperties(answer, properties);
+        } catch (Exception e) {
+            throw new SQLException("Error setting properties on output class " + outputClass, e);
+        }
+
+        // check we could map all properties to the bean
+        if (!properties.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "Cannot map all properties to bean of type " + outputClass + ". There are " + properties.size() + " unmapped properties. " + properties);
+        }
+        return answer;
+    }
+
+}
diff --git a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
index 0406930..c5f0a9a 100644
--- a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
+++ b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
@@ -25,7 +25,6 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -324,7 +323,7 @@ public class JdbcProducer extends DefaultProducer {
         JdbcOutputType outputType = getEndpoint().getOutputType();
         exchange.getOut().setHeader(JdbcConstants.JDBC_COLUMN_NAMES, iterator.getColumnNames());
         if (outputType == JdbcOutputType.StreamList) {
-            exchange.getOut().setBody(iterator);
+            exchange.getOut().setBody(new StreamListIterator(getEndpoint().getCamelContext(), getEndpoint().getOutputClass(), getEndpoint().getBeanRowMapper(), iterator));
             exchange.addOnCompletion(new ResultSetIteratorCompletion(iterator));
             // do not close resources as we are in streaming mode
             answer = false;
@@ -347,7 +346,7 @@ public class JdbcProducer extends DefaultProducer {
             Map<String, Object> row = iterator.next();
             Object value;
             if (getEndpoint().getOutputClass() != null) {
-                value = newBeanInstance(row);
+                value = JdbcHelper.newBeanInstance(getEndpoint().getCamelContext(), getEndpoint().getOutputClass(), getEndpoint().getBeanRowMapper(), row);
             } else {
                 value = row;
             }
@@ -365,7 +364,7 @@ public class JdbcProducer extends DefaultProducer {
         if (iterator.hasNext()) {
             throw new SQLDataException("Query result not unique for outputType=SelectOne.");
         } else if (getEndpoint().getOutputClass() != null) {
-            return newBeanInstance(row);
+            return JdbcHelper.newBeanInstance(getEndpoint().getCamelContext(), getEndpoint().getOutputClass(), getEndpoint().getBeanRowMapper(), row);
         } else if (row.size() == 1) {
             return row.values().iterator().next();
         } else {
@@ -373,32 +372,6 @@ public class JdbcProducer extends DefaultProducer {
         }
     }
 
-    private Object newBeanInstance(Map<String, Object> row) throws SQLException {
-        Class<?> outputClass = getEndpoint().getCamelContext().getClassResolver().resolveClass(getEndpoint().getOutputClass());
-        Object answer = getEndpoint().getCamelContext().getInjector().newInstance(outputClass);
-
-        Map<String, Object> properties = new LinkedHashMap<>();
-
-        // map row names using the bean row mapper
-        for (Map.Entry<String, Object> entry : row.entrySet()) {
-            Object value = entry.getValue();
-            String name = getEndpoint().getBeanRowMapper().map(entry.getKey(), value);
-            properties.put(name, value);
-        }
-        try {
-            IntrospectionSupport.setProperties(answer, properties);
-        } catch (Exception e) {
-            throw new SQLException("Error setting properties on output class " + outputClass, e);
-        }
-
-        // check we could map all properties to the bean
-        if (!properties.isEmpty()) {
-            throw new IllegalArgumentException(
-                    "Cannot map all properties to bean of type " + outputClass + ". There are " + properties.size() + " unmapped properties. " + properties);
-        }
-        return answer;
-    }
-
     private static final class ResultSetIteratorCompletion implements Synchronization {
         private final ResultSetIterator iterator;
 
diff --git a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/StreamListIterator.java b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/StreamListIterator.java
new file mode 100644
index 0000000..6d483ff
--- /dev/null
+++ b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/StreamListIterator.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jdbc;
+
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import org.apache.camel.CamelContext;
+
+import static org.apache.camel.component.jdbc.JdbcHelper.newBeanInstance;
+
+public final class StreamListIterator implements Iterator {
+
+    private final CamelContext camelContext;
+    private final String outputClass;
+    private final BeanRowMapper beanRowMapper;
+    private final Iterator delegate;
+
+    public StreamListIterator(CamelContext camelContext, String outputClass,
+                              BeanRowMapper beanRowMapper, Iterator delegate) {
+        this.camelContext = camelContext;
+        this.outputClass = outputClass;
+        this.beanRowMapper = beanRowMapper;
+        this.delegate = delegate;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return delegate.hasNext();
+    }
+
+    @Override
+    public Object next() {
+        Object answer;
+        Map row = (Map) delegate.next();
+        if (row != null && outputClass != null) {
+            try {
+                answer = newBeanInstance(camelContext, outputClass, beanRowMapper, row);
+            } catch (SQLException e) {
+                throw new RuntimeException(e);
+            }
+        } else {
+            answer = row;
+        }
+        return answer;
+    }
+
+    @Override
+    public void remove() {
+        delegate.remove();
+    }
+
+    @Override
+    public void forEachRemaining(Consumer action) {
+        delegate.forEachRemaining(action);
+    }
+}
diff --git a/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerOutputTypeStreamListOutputClassTest.java b/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerOutputTypeStreamListOutputClassTest.java
new file mode 100644
index 0000000..2df8750
--- /dev/null
+++ b/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerOutputTypeStreamListOutputClassTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jdbc;
+
+import java.util.Iterator;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+
+public class JdbcProducerOutputTypeStreamListOutputClassTest extends AbstractJdbcTestSupport {
+    private static final String QUERY = "select * from customer";
+
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint result;
+
+    @Test
+    public void shouldReturnAnIterator() throws Exception {
+        result.expectedMessageCount(1);
+
+        template.sendBody("direct:start", QUERY);
+
+        result.assertIsSatisfied();
+        assertThat(resultBodyAt(0), instanceOf(Iterator.class));
+    }
+
+    @Test
+    public void shouldStreamResultRows() throws Exception {
+        result.expectedMessageCount(3);
+
+        template.sendBody("direct:withSplit", QUERY);
+
+        result.assertIsSatisfied();
+        assertThat(resultBodyAt(0), instanceOf(CustomerModel.class));
+        assertThat(resultBodyAt(1), instanceOf(CustomerModel.class));
+        assertThat(resultBodyAt(2), instanceOf(CustomerModel.class));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("direct:start").to("jdbc:testdb?outputType=StreamList&outputClass=org.apache.camel.component.jdbc.CustomerModel").to("mock:result");
+                from("direct:withSplit").to("jdbc:testdb?outputType=StreamList&outputClass=org.apache.camel.component.jdbc.CustomerModel").split(body()).to("mock:result");
+            }
+        };
+    }
+
+    private Object resultBodyAt(int index) {
+        return result.assertExchangeReceived(index).getIn().getBody();
+    }
+}