You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/03/18 11:07:17 UTC

camel git commit: CAMEL-9711: camel-sql - Add support for SQL IN in producer with dynamic values

Repository: camel
Updated Branches:
  refs/heads/master 924d1f2f9 -> 17fd2363a


CAMEL-9711: camel-sql - Add support for SQL IN in producer with dynamic values


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/17fd2363
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/17fd2363
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/17fd2363

Branch: refs/heads/master
Commit: 17fd2363a45141fc73ec459de42f4df66998e8e3
Parents: 924d1f2
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Mar 18 10:22:39 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Mar 18 11:05:29 2016 +0100

----------------------------------------------------------------------
 .../elsql/ElsqlSqlPrepareStatementStrategy.java |   2 +-
 .../sql/DefaultSqlPrepareStatementStrategy.java | 133 +++++++++++++++----
 .../sql/DefaultSqlProcessingStrategy.java       |   4 +-
 .../apache/camel/component/sql/SqlConsumer.java |   2 +-
 .../camel/component/sql/SqlInIterator.java      |  56 ++++++++
 .../sql/SqlPrepareStatementStrategy.java        |   3 +-
 .../apache/camel/component/sql/SqlProducer.java |   2 +-
 .../sql/SqlProducerInExpressionTest.java        |  38 ++++++
 .../sql/SqlProducerInQueryEndpointTest.java     |  38 ++++++
 .../camel/component/sql/SqlProducerInTest.java  | 122 +++++++++++++++++
 .../src/test/resources/sql/selectProjectsIn.sql |   5 +
 .../sql/selectProjectsInExpression.sql          |   5 +
 12 files changed, 379 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/17fd2363/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlPrepareStatementStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlPrepareStatementStrategy.java b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlPrepareStatementStrategy.java
index d2ef797..0b998f2 100644
--- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlPrepareStatementStrategy.java
+++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlPrepareStatementStrategy.java
@@ -25,7 +25,7 @@ import org.apache.camel.component.sql.SqlPrepareStatementStrategy;
 
 public class ElsqlSqlPrepareStatementStrategy implements SqlPrepareStatementStrategy {
 
-    public String prepareQuery(String query, boolean allowNamedParameters) throws SQLException {
+    public String prepareQuery(String query, boolean allowNamedParameters, Exchange exchange) throws SQLException {
         return query;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/17fd2363/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlPrepareStatementStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlPrepareStatementStrategy.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlPrepareStatementStrategy.java
index bed55d1..7e91e72 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlPrepareStatementStrategy.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlPrepareStatementStrategy.java
@@ -30,6 +30,8 @@ import java.util.regex.Pattern;
 import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeExchangeException;
 import org.apache.camel.language.simple.SimpleLanguage;
+import org.apache.camel.util.CollectionStringBuffer;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.StringQuoteHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +43,9 @@ import org.springframework.jdbc.core.ArgumentPreparedStatementSetter;
 public class DefaultSqlPrepareStatementStrategy implements SqlPrepareStatementStrategy {
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultSqlPrepareStatementStrategy.class);
+    private static final Pattern REPLACE_IN_PATTERN = Pattern.compile("\\:\\?in\\:(\\w+|\\$\\{[^\\}]+\\})", Pattern.MULTILINE);
     private static final Pattern REPLACE_PATTERN = Pattern.compile("\\:\\?\\w+|\\:\\?\\$\\{[^\\}]+\\}", Pattern.MULTILINE);
+    private static final Pattern NAME_IN_PATTERN = Pattern.compile("\\:\\?(in\\:(\\w+|\\$\\{[^\\}]+\\}))", Pattern.MULTILINE);
     private static final Pattern NAME_PATTERN = Pattern.compile("\\:\\?(\\w+|\\$\\{[^\\}]+\\})", Pattern.MULTILINE);
     private final char separator;
 
@@ -54,9 +58,27 @@ public class DefaultSqlPrepareStatementStrategy implements SqlPrepareStatementSt
     }
 
     @Override
-    public String prepareQuery(String query, boolean allowNamedParameters) throws SQLException {
+    public String prepareQuery(String query, boolean allowNamedParameters, final Exchange exchange) throws SQLException {
         String answer;
         if (allowNamedParameters && hasNamedParameters(query)) {
+            if (exchange != null) {
+                // replace all :?in:word with a number of placeholders for how many values are expected in the IN values
+                Matcher matcher = REPLACE_IN_PATTERN.matcher(query);
+                while (matcher.find()) {
+                    String found = matcher.group(1);
+                    Object parameter = lookupParameter(found, exchange, exchange.getIn().getBody());
+                    if (parameter != null) {
+                        Iterator it = createInParameterIterator(parameter);
+                        CollectionStringBuffer csb = new CollectionStringBuffer(",");
+                        while (it.hasNext()) {
+                            it.next();
+                            csb.append("\\?");
+                        }
+                        String replace = csb.toString();
+                        query = matcher.replaceAll(replace);
+                    }
+                }
+            }
             // replace all :?word and :?${foo} with just ?
             answer = REPLACE_PATTERN.matcher(query).replaceAll("\\?");
         } else {
@@ -104,12 +126,27 @@ public class DefaultSqlPrepareStatementStrategy implements SqlPrepareStatementSt
 
         while (iterator != null && iterator.hasNext()) {
             Object value = iterator.next();
-            LOG.trace("Setting parameter #{} with value: {}", argNumber, value);
-            if (argNumber <= expectedParams) {
-                args[i] = value;
+
+            // special for SQL IN where we need to set dynamic number of values
+            if (value instanceof SqlInIterator) {
+                Iterator it = (Iterator) value;
+                while (it.hasNext()) {
+                    Object val = it.next();
+                    LOG.trace("Setting parameter #{} with value: {}", argNumber, val);
+                    if (argNumber <= expectedParams) {
+                        args[i] = val;
+                    }
+                    argNumber++;
+                    i++;
+                }
+            } else {
+                LOG.trace("Setting parameter #{} with value: {}", argNumber, value);
+                if (argNumber <= expectedParams) {
+                    args[i] = value;
+                }
+                argNumber++;
+                i++;
             }
-            argNumber++;
-            i++;
         }
         if (argNumber - 1 != expectedParams) {
             throw new SQLException("Number of parameters mismatch. Expected: " + expectedParams + ", was: " + (argNumber - 1));
@@ -127,19 +164,63 @@ public class DefaultSqlPrepareStatementStrategy implements SqlPrepareStatementSt
 
     private static final class NamedQueryParser {
 
+        private final Matcher inMatcher;
         private final Matcher matcher;
+        // which mode are we in
+        private boolean in;
+        private boolean name;
 
         private NamedQueryParser(String query) {
+            this.inMatcher = NAME_IN_PATTERN.matcher(query);
             this.matcher = NAME_PATTERN.matcher(query);
         }
 
         public String next() {
-            if (!matcher.find()) {
-                return null;
+            if (!name && inMatcher.find()) {
+                // turn on in mode, so we only match using in matcher next time
+                in = true;
+                return inMatcher.group(1);
+            } else if (!in && matcher.find()) {
+                // turn on name mode, so we only match using name matcher next time
+                name = true;
+                return matcher.group(1);
             }
 
-            return matcher.group(1);
+            return null;
+        }
+    }
+
+    protected static Object lookupParameter(String nextParam, Exchange exchange, Object body) {
+        Map<?, ?> bodyMap = safeMap(exchange.getContext().getTypeConverter().tryConvertTo(Map.class, body));
+        Map<?, ?> headersMap = safeMap(exchange.getIn().getHeaders());
+
+        Object answer = null;
+        if (nextParam.startsWith("${") && nextParam.endsWith("}")) {
+            answer = SimpleLanguage.expression(nextParam).evaluate(exchange, Object.class);
+        } else if (bodyMap.containsKey(nextParam)) {
+            answer = bodyMap.get(nextParam);
+        } else if (headersMap.containsKey(nextParam)) {
+            answer = headersMap.get(nextParam);
+        }
+
+        return answer;
+    }
+
+    private static Map<?, ?> safeMap(Map<?, ?> map) {
+        return (map == null || map.isEmpty()) ? Collections.emptyMap() : map;
+    }
+
+    protected static Iterator createInParameterIterator(Object value) {
+        Iterator it;
+        // if the body is a String then honor quotes etc.
+        if (value instanceof String) {
+            String[] tokens = StringQuoteHelper.splitSafeQuote((String) value, ',', true);
+            List<String> list = Arrays.asList(tokens);
+            it = list.iterator();
+        } else {
+            it = ObjectHelper.createIterator(value, null);
         }
+        return new SqlInIterator(it);
     }
 
     private static final class PopulateIterator implements Iterator<Object> {
@@ -148,17 +229,14 @@ public class DefaultSqlPrepareStatementStrategy implements SqlPrepareStatementSt
         private final String query;
         private final NamedQueryParser parser;
         private final Exchange exchange;
-        private final Map<?, ?> bodyMap;
-        private final Map<?, ?> headersMap;
+        private final Object body;
         private String nextParam;
 
         private PopulateIterator(String query, Exchange exchange, Object body) {
             this.query = query;
-            this.parser = new NamedQueryParser(query);
             this.exchange = exchange;
-            this.bodyMap = safeMap(exchange.getContext().getTypeConverter().tryConvertTo(Map.class, body));
-            this.headersMap = safeMap(exchange.getIn().getHeaders());
-
+            this.body = body;
+            this.parser = new NamedQueryParser(query);
             this.nextParam = parser.next();
         }
 
@@ -173,18 +251,26 @@ public class DefaultSqlPrepareStatementStrategy implements SqlPrepareStatementSt
                 throw new NoSuchElementException();
             }
 
+            boolean in = false;
+            if (nextParam.startsWith("in:")) {
+                in = true;
+                nextParam = nextParam.substring(3);
+            }
+
+            Object next = null;
             try {
-                if (nextParam.startsWith("${") && nextParam.endsWith("}")) {
-                    return SimpleLanguage.expression(nextParam).evaluate(exchange, Object.class);
-                } else if (bodyMap.containsKey(nextParam)) {
-                    return bodyMap.get(nextParam);
-                } else if (headersMap.containsKey(nextParam)) {
-                    return headersMap.get(nextParam);
+                next = lookupParameter(nextParam, exchange, body);
+                if (in && next != null) {
+                    next = createInParameterIterator(next);
+                }
+                if (next == null) {
+                    throw new RuntimeExchangeException(String.format(MISSING_PARAMETER_EXCEPTION, nextParam, query), exchange);
                 }
-                throw new RuntimeExchangeException(String.format(MISSING_PARAMETER_EXCEPTION, nextParam, query), exchange);
             } finally {
                 nextParam = parser.next();
             }
+
+            return next;
         }
 
         @Override
@@ -192,8 +278,5 @@ public class DefaultSqlPrepareStatementStrategy implements SqlPrepareStatementSt
             throw new UnsupportedOperationException();
         }
 
-        private static Map<?, ?> safeMap(Map<?, ?> map) {
-            return (map == null || map.isEmpty()) ? Collections.emptyMap() : map;
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/17fd2363/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
index 1d7f96d..281bba1 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
@@ -41,7 +41,7 @@ public class DefaultSqlProcessingStrategy implements SqlProcessingStrategy {
     @Override
     public int commit(final DefaultSqlEndpoint endpoint, final Exchange exchange, final Object data, final JdbcTemplate jdbcTemplate, final String query) throws Exception {
 
-        final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(query, endpoint.isAllowNamedParameters());
+        final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(query, endpoint.isAllowNamedParameters(), exchange);
 
         return jdbcTemplate.execute(preparedQuery, new PreparedStatementCallback<Integer>() {
             public Integer doInPreparedStatement(PreparedStatement ps) throws SQLException {
@@ -67,7 +67,7 @@ public class DefaultSqlProcessingStrategy implements SqlProcessingStrategy {
 
     @Override
     public int commitBatchComplete(final DefaultSqlEndpoint endpoint, final JdbcTemplate jdbcTemplate, final String query) throws Exception {
-        final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(query, endpoint.isAllowNamedParameters());
+        final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(query, endpoint.isAllowNamedParameters(), null);
 
         return jdbcTemplate.execute(preparedQuery, new PreparedStatementCallback<Integer>() {
             public Integer doInPreparedStatement(PreparedStatement ps) throws SQLException {

http://git-wip-us.apache.org/repos/asf/camel/blob/17fd2363/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
index a9bc8ea..74bad30 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
@@ -106,7 +106,7 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
         shutdownRunningTask = null;
         pendingExchanges = 0;
 
-        final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(resolvedQuery, getEndpoint().isAllowNamedParameters());
+        final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(resolvedQuery, getEndpoint().isAllowNamedParameters(), null);
         final PreparedStatementCallback<Integer> callback = new PreparedStatementCallback<Integer>() {
             @Override
             public Integer doInPreparedStatement(PreparedStatement preparedStatement) throws SQLException, DataAccessException {

http://git-wip-us.apache.org/repos/asf/camel/blob/17fd2363/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlInIterator.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlInIterator.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlInIterator.java
new file mode 100644
index 0000000..32136bb
--- /dev/null
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlInIterator.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import java.util.Iterator;
+import java.util.function.Consumer;
+
+/**
+ * Iterator used for SQL IN query.
+ * <p/>
+ * This ensures we know the parameters is an IN parameter and the values are dynamic and must be
+ * set using this iterator.
+ */
+public class SqlInIterator implements Iterator {
+
+    private final Iterator it;
+
+    public SqlInIterator(Iterator it) {
+        this.it = it;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return it.hasNext();
+    }
+
+    @Override
+    public Object next() {
+        return it.next();
+    }
+
+    @Override
+    public void remove() {
+        it.remove();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void forEachRemaining(Consumer action) {
+        it.forEachRemaining(action);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/17fd2363/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlPrepareStatementStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlPrepareStatementStrategy.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlPrepareStatementStrategy.java
index 8e52157..07cda80 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlPrepareStatementStrategy.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlPrepareStatementStrategy.java
@@ -32,9 +32,10 @@ public interface SqlPrepareStatementStrategy {
      *
      * @param query                 the query which may contain named query parameters
      * @param allowNamedParameters  whether named parameters is allowed
+     * @param exchange              the current exchange
      * @return the query to actually use, which must be accepted by the JDBC driver.
      */
-    String prepareQuery(String query, boolean allowNamedParameters) throws SQLException;
+    String prepareQuery(String query, boolean allowNamedParameters, Exchange exchange) throws SQLException;
 
     /**
      * Creates the iterator to use when setting query parameters on the prepared statement.

http://git-wip-us.apache.org/repos/asf/camel/blob/17fd2363/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
index 3d70acd..6e17cd0 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
@@ -76,7 +76,7 @@ public class SqlProducer extends DefaultProducer {
             String queryHeader = exchange.getIn().getHeader(SqlConstants.SQL_QUERY, String.class);
             sql = queryHeader != null ? queryHeader : resolvedQuery;
         }
-        final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(sql, getEndpoint().isAllowNamedParameters());
+        final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(sql, getEndpoint().isAllowNamedParameters(), exchange);
 
         // CAMEL-7313 - check whether to return generated keys
         final Boolean shouldRetrieveGeneratedKeys =

http://git-wip-us.apache.org/repos/asf/camel/blob/17fd2363/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerInExpressionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerInExpressionTest.java b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerInExpressionTest.java
new file mode 100644
index 0000000..6fd0e94
--- /dev/null
+++ b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerInExpressionTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import org.apache.camel.builder.RouteBuilder;
+
+public class SqlProducerInExpressionTest extends SqlProducerInTest {
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // required for the sql component
+                getContext().getComponent("sql", SqlComponent.class).setDataSource(db);
+
+                from("direct:query")
+                    .to("sql:classpath:sql/selectProjectsInExpression.sql")
+                    .to("log:query")
+                    .to("mock:query");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/17fd2363/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerInQueryEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerInQueryEndpointTest.java b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerInQueryEndpointTest.java
new file mode 100644
index 0000000..78b0021
--- /dev/null
+++ b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerInQueryEndpointTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import org.apache.camel.builder.RouteBuilder;
+
+public class SqlProducerInQueryEndpointTest extends SqlProducerInTest {
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // required for the sql component
+                getContext().getComponent("sql", SqlComponent.class).setDataSource(db);
+
+                from("direct:query")
+                    .to("sql:select * from projects where project in (:#in:names) order by id")
+                    .to("log:query")
+                    .to("mock:query");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/17fd2363/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerInTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerInTest.java b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerInTest.java
new file mode 100644
index 0000000..401a35f
--- /dev/null
+++ b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerInTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+public class SqlProducerInTest extends CamelTestSupport {
+
+    EmbeddedDatabase db;
+
+    @Before
+    public void setUp() throws Exception {
+        db = new EmbeddedDatabaseBuilder()
+                .setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
+
+        super.setUp();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+
+        db.shutdown();
+    }
+
+    @Test
+    public void testQueryInArray() throws InterruptedException {
+        MockEndpoint mock = getMockEndpoint("mock:query");
+        mock.expectedMessageCount(1);
+
+        template.requestBodyAndHeader("direct:query", "Hi there!", "names", new String[]{"Camel", "AMQ"});
+
+        assertMockEndpointsSatisfied();
+
+        List list = mock.getReceivedExchanges().get(0).getIn().getBody(List.class);
+        assertEquals(2, list.size());
+        Map row = (Map) list.get(0);
+        assertEquals("Camel", row.get("PROJECT"));
+        row = (Map) list.get(1);
+        assertEquals("AMQ", row.get("PROJECT"));
+    }
+
+    @Test
+    public void testQueryInList() throws InterruptedException {
+        MockEndpoint mock = getMockEndpoint("mock:query");
+        mock.expectedMessageCount(1);
+
+        List<String> names = new ArrayList<String>();
+        names.add("Camel");
+        names.add("AMQ");
+
+        template.requestBodyAndHeader("direct:query", "Hi there!", "names", names);
+
+        assertMockEndpointsSatisfied();
+
+        List list = mock.getReceivedExchanges().get(0).getIn().getBody(List.class);
+        assertEquals(2, list.size());
+        Map row = (Map) list.get(0);
+        assertEquals("Camel", row.get("PROJECT"));
+        row = (Map) list.get(1);
+        assertEquals("AMQ", row.get("PROJECT"));
+    }
+
+    @Test
+    public void testQueryInString() throws InterruptedException {
+        MockEndpoint mock = getMockEndpoint("mock:query");
+        mock.expectedMessageCount(1);
+
+        template.requestBodyAndHeader("direct:query", "Hi there!", "names", "Camel,AMQ");
+
+        assertMockEndpointsSatisfied();
+
+        List list = mock.getReceivedExchanges().get(0).getIn().getBody(List.class);
+        assertEquals(2, list.size());
+        Map row = (Map) list.get(0);
+        assertEquals("Camel", row.get("PROJECT"));
+        row = (Map) list.get(1);
+        assertEquals("AMQ", row.get("PROJECT"));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // required for the sql component
+                getContext().getComponent("sql", SqlComponent.class).setDataSource(db);
+
+                from("direct:query")
+                    .to("sql:classpath:sql/selectProjectsIn.sql")
+                    .to("log:query")
+                    .to("mock:query");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/17fd2363/components/camel-sql/src/test/resources/sql/selectProjectsIn.sql
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/test/resources/sql/selectProjectsIn.sql b/components/camel-sql/src/test/resources/sql/selectProjectsIn.sql
new file mode 100644
index 0000000..202bac5
--- /dev/null
+++ b/components/camel-sql/src/test/resources/sql/selectProjectsIn.sql
@@ -0,0 +1,5 @@
+-- this is a comment
+select *
+from projects
+where project in (:#in:names)
+order by id
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/17fd2363/components/camel-sql/src/test/resources/sql/selectProjectsInExpression.sql
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/test/resources/sql/selectProjectsInExpression.sql b/components/camel-sql/src/test/resources/sql/selectProjectsInExpression.sql
new file mode 100644
index 0000000..1a35d9b
--- /dev/null
+++ b/components/camel-sql/src/test/resources/sql/selectProjectsInExpression.sql
@@ -0,0 +1,5 @@
+-- this is a comment
+select *
+from projects
+where project in (:#in:${header.names})
+order by id
\ No newline at end of file