You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/10/07 03:48:32 UTC
[camel] branch camel-2.24.x updated: CAMEL-14035: JDBC StreamList
and outputClass does not work
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-2.24.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.24.x by this push:
new ab643e0 CAMEL-14035: JDBC StreamList and outputClass does not work
ab643e0 is described below
commit ab643e0b55fb5780dab84c31718aae3bf8043ef2
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Oct 7 05:43:42 2019 +0200
CAMEL-14035: JDBC StreamList and outputClass does not work
---
.../apache/camel/component/jdbc/JdbcHelper.java | 58 +++++++++++++++++
.../apache/camel/component/jdbc/JdbcProducer.java | 33 +---------
.../camel/component/jdbc/StreamListIterator.java | 73 ++++++++++++++++++++++
...roducerOutputTypeStreamListOutputClassTest.java | 69 ++++++++++++++++++++
4 files changed, 203 insertions(+), 30 deletions(-)
diff --git a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcHelper.java b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcHelper.java
new file mode 100644
index 0000000..c6468ba
--- /dev/null
+++ b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcHelper.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jdbc;
+
+import java.sql.SQLException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.util.IntrospectionSupport;
+
+public final class JdbcHelper {
+
+ private JdbcHelper() {
+ }
+
+ public static Object newBeanInstance(CamelContext camelContext, String outputClass,
+ BeanRowMapper beanRowMapper, Map<String, Object> row) throws SQLException {
+ Class<?> clazz = camelContext.getClassResolver().resolveClass(outputClass);
+ Object answer = camelContext.getInjector().newInstance(clazz);
+
+ Map<String, Object> properties = new LinkedHashMap<>();
+
+ // map row names using the bean row mapper
+ for (Map.Entry<String, Object> entry : row.entrySet()) {
+ Object value = entry.getValue();
+ String name = beanRowMapper.map(entry.getKey(), value);
+ properties.put(name, value);
+ }
+ try {
+ IntrospectionSupport.setProperties(answer, properties);
+ } catch (Exception e) {
+ throw new SQLException("Error setting properties on output class " + outputClass, e);
+ }
+
+ // check we could map all properties to the bean
+ if (!properties.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Cannot map all properties to bean of type " + outputClass + ". There are " + properties.size() + " unmapped properties. " + properties);
+ }
+ return answer;
+ }
+
+}
diff --git a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
index 0406930..c5f0a9a 100644
--- a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
+++ b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
@@ -25,7 +25,6 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -324,7 +323,7 @@ public class JdbcProducer extends DefaultProducer {
JdbcOutputType outputType = getEndpoint().getOutputType();
exchange.getOut().setHeader(JdbcConstants.JDBC_COLUMN_NAMES, iterator.getColumnNames());
if (outputType == JdbcOutputType.StreamList) {
- exchange.getOut().setBody(iterator);
+ exchange.getOut().setBody(new StreamListIterator(getEndpoint().getCamelContext(), getEndpoint().getOutputClass(), getEndpoint().getBeanRowMapper(), iterator));
exchange.addOnCompletion(new ResultSetIteratorCompletion(iterator));
// do not close resources as we are in streaming mode
answer = false;
@@ -347,7 +346,7 @@ public class JdbcProducer extends DefaultProducer {
Map<String, Object> row = iterator.next();
Object value;
if (getEndpoint().getOutputClass() != null) {
- value = newBeanInstance(row);
+ value = JdbcHelper.newBeanInstance(getEndpoint().getCamelContext(), getEndpoint().getOutputClass(), getEndpoint().getBeanRowMapper(), row);
} else {
value = row;
}
@@ -365,7 +364,7 @@ public class JdbcProducer extends DefaultProducer {
if (iterator.hasNext()) {
throw new SQLDataException("Query result not unique for outputType=SelectOne.");
} else if (getEndpoint().getOutputClass() != null) {
- return newBeanInstance(row);
+ return JdbcHelper.newBeanInstance(getEndpoint().getCamelContext(), getEndpoint().getOutputClass(), getEndpoint().getBeanRowMapper(), row);
} else if (row.size() == 1) {
return row.values().iterator().next();
} else {
@@ -373,32 +372,6 @@ public class JdbcProducer extends DefaultProducer {
}
}
- private Object newBeanInstance(Map<String, Object> row) throws SQLException {
- Class<?> outputClass = getEndpoint().getCamelContext().getClassResolver().resolveClass(getEndpoint().getOutputClass());
- Object answer = getEndpoint().getCamelContext().getInjector().newInstance(outputClass);
-
- Map<String, Object> properties = new LinkedHashMap<>();
-
- // map row names using the bean row mapper
- for (Map.Entry<String, Object> entry : row.entrySet()) {
- Object value = entry.getValue();
- String name = getEndpoint().getBeanRowMapper().map(entry.getKey(), value);
- properties.put(name, value);
- }
- try {
- IntrospectionSupport.setProperties(answer, properties);
- } catch (Exception e) {
- throw new SQLException("Error setting properties on output class " + outputClass, e);
- }
-
- // check we could map all properties to the bean
- if (!properties.isEmpty()) {
- throw new IllegalArgumentException(
- "Cannot map all properties to bean of type " + outputClass + ". There are " + properties.size() + " unmapped properties. " + properties);
- }
- return answer;
- }
-
private static final class ResultSetIteratorCompletion implements Synchronization {
private final ResultSetIterator iterator;
diff --git a/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/StreamListIterator.java b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/StreamListIterator.java
new file mode 100644
index 0000000..6d483ff
--- /dev/null
+++ b/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/StreamListIterator.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jdbc;
+
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import org.apache.camel.CamelContext;
+
+import static org.apache.camel.component.jdbc.JdbcHelper.newBeanInstance;
+
+public final class StreamListIterator implements Iterator {
+
+ private final CamelContext camelContext;
+ private final String outputClass;
+ private final BeanRowMapper beanRowMapper;
+ private final Iterator delegate;
+
+ public StreamListIterator(CamelContext camelContext, String outputClass,
+ BeanRowMapper beanRowMapper, Iterator delegate) {
+ this.camelContext = camelContext;
+ this.outputClass = outputClass;
+ this.beanRowMapper = beanRowMapper;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return delegate.hasNext();
+ }
+
+ @Override
+ public Object next() {
+ Object answer;
+ Map row = (Map) delegate.next();
+ if (row != null && outputClass != null) {
+ try {
+ answer = newBeanInstance(camelContext, outputClass, beanRowMapper, row);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ answer = row;
+ }
+ return answer;
+ }
+
+ @Override
+ public void remove() {
+ delegate.remove();
+ }
+
+ @Override
+ public void forEachRemaining(Consumer action) {
+ delegate.forEachRemaining(action);
+ }
+}
diff --git a/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerOutputTypeStreamListOutputClassTest.java b/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerOutputTypeStreamListOutputClassTest.java
new file mode 100644
index 0000000..2df8750
--- /dev/null
+++ b/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerOutputTypeStreamListOutputClassTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jdbc;
+
+import java.util.Iterator;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+
+public class JdbcProducerOutputTypeStreamListOutputClassTest extends AbstractJdbcTestSupport {
+ private static final String QUERY = "select * from customer";
+
+ @EndpointInject(uri = "mock:result")
+ private MockEndpoint result;
+
+ @Test
+ public void shouldReturnAnIterator() throws Exception {
+ result.expectedMessageCount(1);
+
+ template.sendBody("direct:start", QUERY);
+
+ result.assertIsSatisfied();
+ assertThat(resultBodyAt(0), instanceOf(Iterator.class));
+ }
+
+ @Test
+ public void shouldStreamResultRows() throws Exception {
+ result.expectedMessageCount(3);
+
+ template.sendBody("direct:withSplit", QUERY);
+
+ result.assertIsSatisfied();
+ assertThat(resultBodyAt(0), instanceOf(CustomerModel.class));
+ assertThat(resultBodyAt(1), instanceOf(CustomerModel.class));
+ assertThat(resultBodyAt(2), instanceOf(CustomerModel.class));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ from("direct:start").to("jdbc:testdb?outputType=StreamList&outputClass=org.apache.camel.component.jdbc.CustomerModel").to("mock:result");
+ from("direct:withSplit").to("jdbc:testdb?outputType=StreamList&outputClass=org.apache.camel.component.jdbc.CustomerModel").split(body()).to("mock:result");
+ }
+ };
+ }
+
+ private Object resultBodyAt(int index) {
+ return result.assertExchangeReceived(index).getIn().getBody();
+ }
+}