You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/08/15 15:06:02 UTC

[camel] branch master updated: CAMEL-12734: camel-sql and camel-elsql consumer now support dynamic query parameters in route from. Notice its limited to be mostly about calling beans via simple expressions.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 28771d0  CAMEL-12734: camel-sql and camel-elsql consumer now support dynamic query parameters in route from. Notice its limited to be mostly about calling beans via simple expressions.
28771d0 is described below

commit 28771d02bc8b33c73ed8da40ed1e4d909d224f7e
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Aug 15 17:05:43 2018 +0200

    CAMEL-12734: camel-sql and camel-elsql consumer now support dynamic query parameters in route from. Notice its limited to be mostly about calling beans via simple expressions.
---
 .../camel-elsql/src/main/docs/elsql-component.adoc |  94 +++++---
 .../camel/component/elsql/ElsqlEndpoint.java       |   4 +-
 .../camel/component/elsql/ElsqlSqlMapSource.java   |  10 +-
 .../elsql/ElsqlSqlPrepareStatementStrategy.java    |   2 +
 .../elsql/ElSqlConsumerDynamicParameterTest.java   | 110 +++++++++
 .../src/test/resources/elsql/projects.elsql        |   4 +
 .../camel-sql/src/main/docs/sql-component.adoc     | 257 ++++++++++-----------
 .../camel/component/sql/DefaultSqlEndpoint.java    |   4 +-
 .../apache/camel/component/sql/SqlConsumer.java    |  22 +-
 .../apache/camel/component/sql/SqlEndpoint.java    |   2 +
 .../sql/SqlConsumerDynamicParameterTest.java       | 109 +++++++++
 11 files changed, 445 insertions(+), 173 deletions(-)

diff --git a/components/camel-elsql/src/main/docs/elsql-component.adoc b/components/camel-elsql/src/main/docs/elsql-component.adoc
index 3131647..d77af0d 100644
--- a/components/camel-elsql/src/main/docs/elsql-component.adoc
+++ b/components/camel-elsql/src/main/docs/elsql-component.adoc
@@ -17,21 +17,21 @@ Maven users will need to add the following dependency to their `pom.xml`
 for this component:
 
 [source,xml]
-------------------------------------------------------------
+----
 <dependency>
     <groupId>org.apache.camel</groupId>
     <artifactId>camel-elsql</artifactId>
     <version>x.x.x</version>
     <!-- use the same version as your Camel core version -->
 </dependency>
-------------------------------------------------------------
+----
 
 The SQL component uses the following endpoint URI notation:
 
-[source,java]
------------------------------------
+[source,text]
+----
 sql:elSqlName:resourceUri[?options]
------------------------------------
+----
 
 You can append query options to the URI in the following
 format, `?option=value&option=value&...`
@@ -47,7 +47,7 @@ expression.
 
 If a named parameter cannot be resolved, then an exception is thrown.
 
-### Options
+=== Options
 
 // component options: START
 The ElSQL component supports 5 options, which are listed below.
@@ -117,9 +117,9 @@ with the following path and query parameters:
 | *batch* (producer) | Enables or disables batch mode | false | boolean
 | *noop* (producer) | If set, will ignore the results of the SQL query and use the existing IN message as the OUT message for the continuation of processing | false | boolean
 | *useMessageBodyForSql* (producer) | Whether to use the message body as the SQL and then headers for parameters. If this option is enabled then the SQL in the uri is not used. | false | boolean
-| *alwaysPopulateStatement* (producer) | If enabled then the populateStatement method from org.apache.camel.component.sql.SqlPrepareStatementStrategy is always invoked, also if there is no expected parameters to be prepared. When this is false then the populateStatement is only invoked if there is 1 or more expected parameters to be set; for example this avoids reading the message body/headers for SQL queries with no parameters. | false | boolean
-| *parametersCount* (producer) | If set greater than zero, then Camel will use this count value of parameters to replace instead of querying via JDBC metadata API. This is useful if the JDBC vendor could not return correct parameters count, then user may override instead. |  | int
+| *alwaysPopulateStatement* (advanced) | If enabled then the populateStatement method from org.apache.camel.component.sql.SqlPrepareStatementStrategy is always invoked, also if there is no expected parameters to be prepared. When this is false then the populateStatement is only invoked if there is 1 or more expected parameters to be set; for example this avoids reading the message body/headers for SQL queries with no parameters. | false | boolean
 | *elSqlConfig* (advanced) | To use a specific configured ElSqlConfig. It may be better to use the databaseVendor option instead. |  | ElSqlConfig
+| *parametersCount* (advanced) | If set greater than zero, then Camel will use this count value of parameters to replace instead of querying via JDBC metadata API. This is useful if the JDBC vendor could not return correct parameters count, then user may override instead. |  | int
 | *placeholder* (advanced) | Specifies a character that will be replaced to in SQL query. Notice, that it is simple String.replaceAll() operation and no SQL parsing is involved (quoted strings will also change). | # | String
 | *prepareStatementStrategy* (advanced) | Allows to plugin to use a custom org.apache.camel.component.sql.SqlPrepareStatementStrategy to control preparation of the query and prepared statement. |  | SqlPrepareStatement Strategy
 | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
@@ -161,7 +161,7 @@ The component supports 6 options, which are listed below.
 // spring-boot-auto-configure options: END
 
 
-### Result of the query
+=== Result of the query
 
 For `select` operations, the result is an instance of
 `List<Map<String, Object>>` type, as returned by the
@@ -175,13 +175,13 @@ headers, it provides a concise syntax for querying a sequence or some
 other small value into a header.  It is convenient to use outputHeader
 and outputType together:
 
-### Header values
+=== Header values
 
 When performing `update` operations, the SQL Component stores the update
 count in the following message headers:
 
 [width="100%",cols="10%,90%",options="header",]
-|=======================================================================
+|===
 |Header |Description
 
 |`CamelSqlUpdateCount` |The number of rows updated for `update` operations, returned as an
@@ -189,9 +189,9 @@ count in the following message headers:
 
 |`CamelSqlRowCount` |The number of rows returned for `select` operations, returned as an
 `Integer` object.
-|=======================================================================
+|===
 
-#### Sample
+==== Sample
 
 In the given route below, we want to get all the projects from the
 projects table. Notice the SQL query has 2 named parameters, :#lic and
@@ -203,53 +203,85 @@ constant value +
  for the named parameters:
 
 [source,java]
------------------------------------------------
+----
    from("direct:projects")
      .setHeader("lic", constant("ASF"))
      .setHeader("min", constant(123))
      .to("elsql:projects:com/foo/orders.elsql")
------------------------------------------------
+----
 
 And the https://github.com/OpenGamma/ElSql[elsql] mapping file
 
-[source,java]
-------------------------------------
+[source,sql]
+----
 @NAME(projects)
   SELECT *
   FROM projects
   WHERE license = :lic AND id > :min
   ORDER BY id
-------------------------------------
+----
 
 Though if the message body is a `java.util.Map` then the named
 parameters will be taken from the body.
 
 [source,java]
------------------------------------------------
+----
    from("direct:projects")
      .to("elsql:projects:com/foo/orders.elsql")
------------------------------------------------
+----
+
+=== Using expression parameters in producers
 
 In from Camel 2.16.1 onwards you can use Simple expressions as well,
 which allows to use an OGNL like notation on the message body, where it
 assumes to have `getLicense` and `getMinimum` methods:
 
-[source,java]
-------------------------------------------------------------
+[source,sql]
+----
 @NAME(projects)
   SELECT *
   FROM projects
   WHERE license = :${body.license} AND id > :${body.minimum}
   ORDER BY id
-------------------------------------------------------------
+----
+
+==== Using expression parameters in consumers
+
+*Available as of Camel 2.23*
+
+When using the ElSql component as consumer, you can now also use expression parameters (simple language)
+to build dynamic query parameters, such as calling a method on a bean to retrieve an id, date or something.
+
+For example in the sample below we call the nextId method on the bean myIdGenerator:
 
-### See Also
+[source,sql]
+----
+@NAME(projectsByIdBean)
+  SELECT *
+  FROM projects
+  WHERE id = :${bean#myIdGenerator.nextId}
+----
+
+IMPORTANT: Notice in the bean syntax above, we must use `#` instead of `:` in the simple expression.
+This is because Spring query parameter parser is in-use which will separate parameters on colon.
+Also pay attention that Spring query parser will invoke the bean twice for each query.
+
+And the bean has the following method:
+
+[source,java]
+----
+public static class MyIdGenerator {
+
+    private int id = 1;
+
+    public int nextId() {
+        // spring will call this twice, one for initializing query and 2nd for actual value
+        id++;
+        return id / 2;
+    }
+----
 
-* Configuring Camel
-* Component
-* Endpoint
-* Getting Started
+Notice that there is no existing `Exchange` with message body and headers, so
+the simple expression you can use in the consumer are most useable for calling
+bean methods as in this example.
 
-* <<sql-component,SQL Component>>
-* <<mybatis-component,MyBatis>>
-* <<jdbc-component,JDBC>>
diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
index c8bb40c..7b10470 100644
--- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
+++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
@@ -26,6 +26,7 @@ import com.opengamma.elsql.ElSqlConfig;
 import com.opengamma.elsql.SpringSqlParams;
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.component.sql.DefaultSqlEndpoint;
@@ -82,7 +83,8 @@ public class ElsqlEndpoint extends DefaultSqlEndpoint {
         final SqlProcessingStrategy proStrategy = new ElsqlSqlProcessingStrategy(elSql);
         final SqlPrepareStatementStrategy preStategy = new ElsqlSqlPrepareStatementStrategy();
 
-        final SqlParameterSource param = new EmptySqlParameterSource();
+        final Exchange dummy = createExchange();
+        final SqlParameterSource param = new ElsqlSqlMapSource(dummy, null);
         final String sql = elSql.getSql(elsqlName, new SpringSqlParams(param));
         LOG.debug("ElsqlConsumer @{} using sql: {}", elsqlName, sql);
 
diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlMapSource.java b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlMapSource.java
index 263ed8c..5b24731 100644
--- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlMapSource.java
+++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlMapSource.java
@@ -49,7 +49,7 @@ public class ElsqlSqlMapSource extends AbstractSqlParameterSource {
     public boolean hasValue(String paramName) {
         if ("body".equals(paramName)) {
             return true;
-        } else if (paramName.startsWith("${") && paramName.endsWith("}")) {
+        } else if ((paramName.startsWith("$simple{") || paramName.startsWith("${")) && paramName.endsWith("}")) {
             return true;
         } else {
             return bodyMap.containsKey(paramName) || headersMap.containsKey(paramName);
@@ -61,8 +61,14 @@ public class ElsqlSqlMapSource extends AbstractSqlParameterSource {
         Object answer;
         if ("body".equals(paramName)) {
             answer = exchange.getIn().getBody();
-        } else if (paramName.startsWith("${") && paramName.endsWith("}")) {
+        } else if ((paramName.startsWith("$simple{") || paramName.startsWith("${")) && paramName.endsWith("}")) {
             // its a simple language expression
+
+            // spring org.springframework.jdbc.core.namedparam.NamedParameterUtils.PARAMETER_SEPARATORS
+            // uses : as parameter separator and we may use colon in simple languages as well such as bean:foo
+            // so we have to use # instead and replace them back
+            paramName = paramName.replace('#', ':');
+
             answer = SimpleLanguage.expression(paramName).evaluate(exchange, Object.class);
         } else {
             answer = bodyMap.get(paramName);
diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlPrepareStatementStrategy.java b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlPrepareStatementStrategy.java
index 0b998f2..fbcedbc 100644
--- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlPrepareStatementStrategy.java
+++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlSqlPrepareStatementStrategy.java
@@ -20,8 +20,10 @@ import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.Iterator;
 
+import com.opengamma.elsql.SpringSqlParams;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.sql.SqlPrepareStatementStrategy;
+import org.springframework.jdbc.core.namedparam.SqlParameterSource;
 
 public class ElsqlSqlPrepareStatementStrategy implements SqlPrepareStatementStrategy {
 
diff --git a/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlConsumerDynamicParameterTest.java b/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlConsumerDynamicParameterTest.java
new file mode 100644
index 0000000..12fcd6d
--- /dev/null
+++ b/components/camel-elsql/src/test/java/org/apache/camel/component/elsql/ElSqlConsumerDynamicParameterTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elsql;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Test;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+/**
+ *
+ */
+public class ElSqlConsumerDynamicParameterTest extends CamelTestSupport {
+
+    private EmbeddedDatabase db;
+    MyIdGenerator idGenerator = new MyIdGenerator();
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("myIdGenerator", idGenerator);
+
+        // this is the database we create with some initial data for our unit test
+        db = new EmbeddedDatabaseBuilder()
+                .setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
+
+        jndi.bind("dataSource", db);
+
+        return jndi;
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+
+        db.shutdown();
+    }
+
+    @Test
+    public void testDynamicConsume() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMinimumMessageCount(3);
+
+        context.startRoute("foo");
+
+        assertMockEndpointsSatisfied();
+
+        List<Exchange> exchanges = mock.getReceivedExchanges();
+
+        assertEquals(1, exchanges.get(0).getIn().getBody(Map.class).get("ID"));
+        assertEquals("Camel", exchanges.get(0).getIn().getBody(Map.class).get("PROJECT"));
+        assertEquals(2, exchanges.get(1).getIn().getBody(Map.class).get("ID"));
+        assertEquals("AMQ", exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
+        assertEquals(3, exchanges.get(2).getIn().getBody(Map.class).get("ID"));
+        assertEquals("Linux", exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));
+
+        // and the bean id should be > 1
+        assertTrue("Id counter should be > 1", idGenerator.getId() > 1);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("elsql:projectsByIdBean:elsql/projects.elsql?dataSource=#dataSource&consumer.initialDelay=0&consumer.delay=50")
+                    .routeId("foo").noAutoStartup()
+                    .to("mock:result");
+            }
+        };
+    }
+
+    public static class MyIdGenerator {
+
+        private int id = 1;
+
+        public int nextId() {
+            // spring will call this twice, one for initializing query and 2nd for actual value
+            id++;
+            return id / 2;
+        }
+
+        public int getId() {
+            return id;
+        }
+    }
+}
diff --git a/components/camel-elsql/src/test/resources/elsql/projects.elsql b/components/camel-elsql/src/test/resources/elsql/projects.elsql
index 995e272..ce838b6 100644
--- a/components/camel-elsql/src/test/resources/elsql/projects.elsql
+++ b/components/camel-elsql/src/test/resources/elsql/projects.elsql
@@ -12,6 +12,10 @@
   SELECT *
   FROM projects
   ORDER BY id
+@NAME(projectsByIdBean)
+  SELECT *
+  FROM projects
+  WHERE id = :${bean#myIdGenerator.nextId}
 @NAME(projectById)
   SELECT *
   FROM projects
diff --git a/components/camel-sql/src/main/docs/sql-component.adoc b/components/camel-sql/src/main/docs/sql-component.adoc
index 9bcf731..fa9309d 100644
--- a/components/camel-sql/src/main/docs/sql-component.adoc
+++ b/components/camel-sql/src/main/docs/sql-component.adoc
@@ -15,14 +15,14 @@ Maven users will need to add the following dependency to their `pom.xml`
 for this component:
 
 [source,xml]
-------------------------------------------------------------
+----
 <dependency>
     <groupId>org.apache.camel</groupId>
     <artifactId>camel-sql</artifactId>
     <version>x.x.x</version>
     <!-- use the same version as your Camel core version -->
 </dependency>
-------------------------------------------------------------
+----
 
 The SQL component also supports:
 
@@ -32,7 +32,7 @@ further below.
 * a JDBC based repository for the Aggregator EIP
 pattern. See further below.
 
-### URI format
+=== URI format
 
 WARNING:From Camel 2.11 onwards this component can create both consumer (e.g.
 `from()`) and producer endpoints (e.g. `to()`).
@@ -43,18 +43,18 @@ http://camel.apache.org/transactional-client.html[Transactional Client].
 
 The SQL component uses the following endpoint URI notation:
 
-[source,java]
-----------------------------------------------------------
+[source,text]
+----
 sql:select * from table where id=# order by name[?options]
-----------------------------------------------------------
+----
 
 From Camel 2.11 onwards you can use named parameters by using
 :`#name_of_the_parameter` style as shown:
 
-[source,java]
----------------------------------------------------------------
+[source,text]
+----
 sql:select * from table where id=:#myId order by name[?options]
----------------------------------------------------------------
+----
 
 When using named parameters, Camel will lookup the names from, in the
 given precedence: +
@@ -66,10 +66,10 @@ If a named parameter cannot be resolved, then an exception is thrown.
 From *Camel 2.14* onward you can use Simple expressions as parameters as
 shown:
 
-[source,java]
----------------------------------------------------------------------------
+[source,text]
+----
 sql:select * from table where id=:#${property.myId} order by name[?options]
----------------------------------------------------------------------------
+----
 
 Notice that the standard `?` symbol that denotes the parameters to an
 SQL query is substituted with the `pass:[#]` symbol, because the `?` symbol is
@@ -79,15 +79,15 @@ be configured on endpoint basis.
 From *Camel 2.17* onwards you can externalize your SQL queries to files
 in the classpath or file system as shown:
 
-[source,java]
----------------------------------------
+[source,text]
+----
 sql:classpath:sql/myquery.sql[?options]
----------------------------------------
+----
 
 And the myquery.sql file is in the classpath and is just a plain text
 
-[source,java]
--------------------------
+[source,sql]
+----
 -- this is a comment
 select *
 from table
@@ -95,7 +95,7 @@ where
   id = :#${property.myId}
 order by
   name
--------------------------
+----
 
 In the file you can use multilines and format the SQL as you wish. And
 also use comments such as the – dash line.
@@ -103,7 +103,7 @@ also use comments such as the – dash line.
 You can append query options to the URI in the following format,
 `?option=value&option=value&...`
 
-### Options
+=== Options
 
 
 
@@ -174,8 +174,8 @@ with the following path and query parameters:
 | *batch* (producer) | Enables or disables batch mode | false | boolean
 | *noop* (producer) | If set, will ignore the results of the SQL query and use the existing IN message as the OUT message for the continuation of processing | false | boolean
 | *useMessageBodyForSql* (producer) | Whether to use the message body as the SQL and then headers for parameters. If this option is enabled then the SQL in the uri is not used. | false | boolean
-| *alwaysPopulateStatement* (producer) | If enabled then the populateStatement method from org.apache.camel.component.sql.SqlPrepareStatementStrategy is always invoked, also if there is no expected parameters to be prepared. When this is false then the populateStatement is only invoked if there is 1 or more expected parameters to be set; for example this avoids reading the message body/headers for SQL queries with no parameters. | false | boolean
-| *parametersCount* (producer) | If set greater than zero, then Camel will use this count value of parameters to replace instead of querying via JDBC metadata API. This is useful if the JDBC vendor could not return correct parameters count, then user may override instead. |  | int
+| *alwaysPopulateStatement* (advanced) | If enabled then the populateStatement method from org.apache.camel.component.sql.SqlPrepareStatementStrategy is always invoked, also if there is no expected parameters to be prepared. When this is false then the populateStatement is only invoked if there is 1 or more expected parameters to be set; for example this avoids reading the message body/headers for SQL queries with no parameters. | false | boolean
+| *parametersCount* (advanced) | If set greater than zero, then Camel will use this count value of parameters to replace instead of querying via JDBC metadata API. This is useful if the JDBC vendor could not return correct parameters count, then user may override instead. |  | int
 | *placeholder* (advanced) | Specifies a character that will be replaced to in SQL query. Notice, that it is simple String.replaceAll() operation and no SQL parsing is involved (quoted strings will also change). | # | String
 | *prepareStatementStrategy* (advanced) | Allows to plugin to use a custom org.apache.camel.component.sql.SqlPrepareStatementStrategy to control preparation of the query and prepared statement. |  | SqlPrepareStatement Strategy
 | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
@@ -216,7 +216,7 @@ The component supports 4 options, which are listed below.
 
 
 
-### Treatment of the message body
+=== Treatment of the message body
 
 The SQL component tries to convert the message body to an object of
 `java.util.Iterator` type and then uses this iterator to fill the query
@@ -241,7 +241,7 @@ parameters must be provided in a header with the
 key SqlConstants.SQL_PARAMETERS. This allows the SQL component to work
 more dynamic as the SQL query is from the message body.
 
-### Result of the query
+=== Result of the query
 
 For `select` operations, the result is an instance of
 `List<Map<String, Object>>` type, as returned by the
@@ -257,13 +257,13 @@ other small value into a header.  It is convenient to use outputHeader
 and outputType together:
 
 [source,java]
--------------------------------------------------------------------------------------------
+----
 from("jms:order.inbox")
     .to("sql:select order_seq.nextval from dual?outputHeader=OrderId&outputType=SelectOne")
     .to("jms:order.booking");
--------------------------------------------------------------------------------------------
+----
 
-### Using StreamList
+=== Using StreamList
 
 From*Camel 2.18* onwards the producer supports outputType=StreamList
 that uses an iterator to stream the output of the query. This allows to
@@ -272,7 +272,7 @@ the Splitter EIP to process each row one at a time,
 and load data from the database as needed.
 
 [source,java]
------------------------------------------------------------------------------------------------------------------------------------
+----
 from("direct:withSplitModel")
         .to("sql:select * from projects order by id?outputType=StreamList&outputClass=org.apache.camel.component.sql.ProjectModel")
         .to("log:stream")
@@ -280,17 +280,16 @@ from("direct:withSplitModel")
             .to("log:row")
             .to("mock:result")
         .end();
------------------------------------------------------------------------------------------------------------------------------------
-
+----
  
 
-### Header values
+=== Header values
 
 When performing `update` operations, the SQL Component stores the update
 count in the following message headers:
 
 [width="100%",cols="10%,90%",options="header",]
-|=======================================================================
+|===
 |Header |Description
 
 |`CamelSqlUpdateCount` |The number of rows updated for `update` operations, returned as an
@@ -304,22 +303,22 @@ outputType=StreamList.
 |`CamelSqlQuery` |*Camel 2.8:* Query to execute. This query takes precedence over the
 query specified in the endpoint URI. Note that query parameters in the
 header _are_ represented by a `?` instead of a `pass:[#]` symbol
-|=======================================================================
+|===
 
 When performing `insert` operations, the SQL Component stores the rows
 with the generated keys and number of these rown in the following
 message headers (*Available as of Camel 2.12.4, 2.13.1*):
 
 [width="100%",cols="10%,90%",options="header",]
-|=======================================================================
+|===
 |Header |Description
 
 |CamelSqlGeneratedKeysRowCount |The number of rows in the header that contains generated keys.
 
 |CamelSqlGeneratedKeyRows |Rows that contains the generated keys (a list of maps of keys).
-|=======================================================================
+|===
 
-### Generated keys
+=== Generated keys
 
 *Available as of Camel 2.12.4, 2.13.1 and 2.14 *
 
@@ -333,45 +332,16 @@ the table above.
 You can see more details in this
 https://gitbox.apache.org/repos/asf?p=camel.git;a=blob;f=components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlGeneratedKeysTest.java;h=54c19b7332bb0aa81ee24ff3d3a66885a6b9e9aa;hb=HEAD[unit test].
 
-### Configuration
+=== DataSource
 
 You can now set a reference to a `DataSource` in the URI directly:
 
-[source,java]
-----------------------------------------------------------------
+[source,text]
+----
 sql:select * from table where id=# order by name?dataSource=myDS
-----------------------------------------------------------------
-
-### Sample
-
-In the sample below we execute a query and retrieve the result as a
-`List` of rows, where each row is a `Map<String, Object` and the key is
-the column name.
-
-First, we set up a table to use for our sample. As this is based on an
-unit test, we do it in java:
-
-The SQL script `createAndPopulateDatabase.sql` we execute looks like as
-described below:
-
-Then we configure our route and our `sql` component. Notice that we use
-a `direct` endpoint in front of the `sql` endpoint. This allows us to
-send an exchange to the `direct` endpoint with the URI, `direct:simple`,
-which is much easier for the client to use than the long `sql:` URI.
-Note that the `DataSource` is looked up up in the registry, so we can
-use standard Spring XML to configure our `DataSource`.
-
-And then we fire the message into the `direct` endpoint that will route
-it to our `sql` component that queries the database.
-
-We could configure the `DataSource` in Spring XML as follows:
-
-[source,xml]
------------------------------------------------------------
- <jee:jndi-lookup id="myDS" jndi-name="jdbc/myDataSource"/>
------------------------------------------------------------
+----
 
-#### Using named parameters
+=== Using named parameters
 
 *Available as of Camel 2.11*
 
@@ -384,23 +354,23 @@ constant value +
  for the named parameters:
 
 [source,java]
----------------------------------------------------------------------------------------
+----
    from("direct:projects")
      .setHeader("lic", constant("ASF"))
      .setHeader("min", constant(123))
      .to("sql:select * from projects where license = :#lic and id > :#min order by id")
----------------------------------------------------------------------------------------
+----
 
 Though if the message body is a `java.util.Map` then the named
 parameters will be taken from the body.
 
 [source,java]
----------------------------------------------------------------------------------------
+----
    from("direct:projects")
      .to("sql:select * from projects where license = :#lic and id > :#min order by id")
----------------------------------------------------------------------------------------
+----
 
-#### Using expression parameters
+=== Using expression parameters in producers
 
 *Available as of Camel 2.14*
 
@@ -409,14 +379,46 @@ database. It uses the body of the exchange for defining the license and
 uses the value of a property as the second parameter.
 
 [source,java]
-----------------------------------------------------------------------------------------------------
+----
 from("direct:projects")
   .setBody(constant("ASF"))
   .setProperty("min", constant(123))
   .to("sql:select * from projects where license = :#${body} and id > :#${property.min} order by id")
-----------------------------------------------------------------------------------------------------
+----
+
+==== Using expression parameters in consumers
+
+*Available as of Camel 2.23*
+
+When using the SQL component as consumer, you can now also use expression parameters (simple language)
+to build dynamic query parameters, such as calling a method on a bean to retrieve an id, date or something.
 
-#### Using IN queries with dynamic values
+For example in the sample below we call the nextId method on the bean myIdGenerator:
+
+[source,java]
+----
+from("sql:select * from projects where id = :#${bean:myIdGenerator.nextId}")
+    .to("mock:result");
+----
+
+And the bean has the following method:
+
+[source,java]
+----
+public static class MyIdGenerator {
+
+    private int id = 1;
+
+    public int nextId() {
+        return id++;
+    }
+----
+
+Notice that there is no existing `Exchange` with message body and headers, so
+the simple expression you can use in the consumer are most useable for calling
+bean methods as in this example.
+
+=== Using IN queries with dynamic values
 
 *Available as of Camel 2.17*
 
@@ -431,34 +433,33 @@ To use IN you need to:
 
 An example explains this better. The following query is used:
 
-[source,java]
------------------------------
+[source,sql]
+----
 -- this is a comment
 select *
 from projects
 where project in (:#in:names)
 order by id
------------------------------
+----
 
 In the following route:
 
 [source,java]
--------------------------------------------------
+----
 from("direct:query")
     .to("sql:classpath:sql/selectProjectsIn.sql")
     .to("log:query")
     .to("mock:query");
--------------------------------------------------
+----
 
 Then the IN query can use a header with the key names with the dynamic
 values such as:
 
 [source,java]
---------------------------------------------------------------------------------------------------
+----
 // use an array
 template.requestBodyAndHeader("direct:query", "Hi there!", "names", new String[]{"Camel", "AMQ"});
 
-
 // use a list
 List<String> names = new ArrayList<String>();
 names.add("Camel");
@@ -466,55 +467,51 @@ names.add("AMQ");
 
 template.requestBodyAndHeader("direct:query", "Hi there!", "names", names);
 
-
 // use a string separated values with comma
 template.requestBodyAndHeader("direct:query", "Hi there!", "names", "Camel,AMQ");
---------------------------------------------------------------------------------------------------
+----
 
 The query can also be specified in the endpoint instead of being
 externalized (notice that externalizing makes maintaining the SQL
 queries easier)
 
 [source,java]
--------------------------------------------------------------------------------
+----
 from("direct:query")
     .to("sql:select * from projects where project in (:#in:names) order by id")
     .to("log:query")
     .to("mock:query");
--------------------------------------------------------------------------------
-
+----
  
 
-### Using the JDBC based idempotent repository
+=== Using the JDBC based idempotent repository
 
 *Available as of Camel 2.7*: In this section we will use the JDBC based
 idempotent repository.
 
-
 TIP:*Abstract class*
 From Camel 2.9 onwards there is an abstract class
 `org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository`
 you can extend to build custom JDBC idempotent repository.
 
- 
 
 First we have to create the database table which will be used by the
 idempotent repository. For *Camel 2.7*, we use the following schema:
 
 [source,sql]
--------------------------------------------------------------------------------
+----
 CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255),
 messageId VARCHAR(100) )
--------------------------------------------------------------------------------
+----
  
 
 In *Camel 2.8*, we added the createdAt column:
 
 [source,sql]
--------------------------------------------------------------------------------
+----
 CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255),
 messageId VARCHAR(100), createdAt TIMESTAMP )
--------------------------------------------------------------------------------
+----
 
 WARNING:The SQL Server *TIMESTAMP* type is a fixed-length binary-string type. It
 does not map to any of the JDBC time types: *DATE*, *TIME*, or
@@ -527,7 +524,7 @@ Starting with *Camel 2.9.1* you have a few options to tune the
 your needs:
 
 [width="100%",cols="10%,10%,80%",options="header",]
-|=======================================================================
+|===
 |Parameter |Default Value |Description
 
 |createTableIfNotExists |true |Defines whether or not Camel should try to create the table if it
@@ -552,13 +549,12 @@ second one is the message id (`String`) and the third one is the
 timestamp (`java.sql.Timestamp`) when this entry was added to the
 repository.
 
-|deleteString |DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId
-= ? |The statement which is used to delete the entry from the database. It
-takes two parameter. This first one is the processor name (`String`) and
+|deleteString |DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ? |The statement which is used to delete the entry from the database.
+It takes two parameter. This first one is the processor name (`String`) and
 the second one is the message id (`String`).
-|=======================================================================
+|===
 
-Using the JDBC based aggregation repository
+=== Using the JDBC based aggregation repository
 
 *Available as of Camel 2.6*
 
@@ -590,7 +586,7 @@ You can see some examples in the unit tests of camel-sql, for example
 https://svn.apache.org/repos/asf/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateRecoverDeadLetterChannelTest.java[this
 test].
 
-Database
+==== Database
 
 To be operational, each aggregator uses two table: the aggregation and
 completed one. By convention the completed has the same name as the
@@ -611,7 +607,7 @@ Here is the SQL query used to create the tables, just replace
 `"aggregation"` with your aggregator repository name.
 
 [source,sql]
--------------------------------------------------------------------------------
+-----
 CREATE TABLE aggregation (
  id varchar(255) NOT NULL,
  exchange blob NOT NULL,
@@ -622,10 +618,10 @@ CREATE TABLE aggregation_completed (
  exchange blob NOT NULL,
  constraint aggregation_completed_pk PRIMARY KEY (id)
 );
--------------------------------------------------------------------------------
+-----
 
 
-Storing body and headers as text
+=== Storing body and headers as text
 
 *Available as of Camel 2.11*
 
@@ -635,7 +631,7 @@ store the body, and the following two headers `companyName` and
 `accountName` use the following SQL:
 
 [source,sql]
--------------------------------------------------------------------------------
+----
 CREATE TABLE aggregationRepo3 (
  id varchar(255) NOT NULL,
  exchange blob NOT NULL,
@@ -652,14 +648,13 @@ CREATE TABLE aggregationRepo3_completed (
  accountName varchar(1000),
  constraint aggregationRepo3_completed_pk PRIMARY KEY (id)
 );
--------------------------------------------------------------------------------
+----
  
-
 And then configure the repository to enable this behavior as shown
 below:
 
 [source,xml]
--------------------------------------------------------------------------------
+----
 <bean id="repo3"
   class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
   <property name="repositoryName" value="aggregationRepo3"/>
@@ -674,9 +669,9 @@ below:
     </list>
   </property>
 </bean>
--------------------------------------------------------------------------------
+----
 
-Codec (Serialization)
+==== Codec (Serialization)
 
 Since they can contain any type of payload, Exchanges are not
 serializable by design. It is converted into a byte array to be stored
@@ -691,18 +686,18 @@ the `currentThread` one. The benefit is to be able to load classes
 exposed by other bundles. This allows the exchange body and headers to
 have custom types object references.
 
-Transaction
+==== Transaction
 
 A Spring `PlatformTransactionManager` is required to orchestrate
 transaction.
 
-Service (Start/Stop)
+===== Service (Start/Stop)
 
 The `start` method verify the connection of the database and the
 presence of the required tables. If anything is wrong it will fail
 during starting.
 
-Aggregator configuration
+==== Aggregator configuration
 
 Depending on the targeted environment, the aggregator might need some
 configuration. As you already know, each aggregator should have its own
@@ -713,7 +708,7 @@ your database system, it can be injected with the `lobHandler` property.
 Here is the declaration for Oracle:
 
 [source,xml]
--------------------------------------------------------------------------------
+----
 <bean id="lobHandler" class="org.springframework.jdbc.support.lob.OracleLobHandler">
   <property name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/>
 </bean>
@@ -727,9 +722,9 @@ Here is the declaration for Oracle:
   <!-- Only with Oracle, else use default -->
   <property name="lobHandler" ref="lobHandler"/>
 </bean>
--------------------------------------------------------------------------------
+----
  
-Optimistic locking
+==== Optimistic locking
 
 From *Camel 2.12* onwards you can turn on `optimisticLocking` and use
 this JDBC based aggregation repository in a clustered environment where
@@ -766,7 +761,7 @@ Here is an example, where we define 2 extra FQN class names from the
 JDBC vendor.
 
 [source,xml]
--------------------------------------------------------------------------------
+-----
 <bean id="repo"
 class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
   <property name="transactionManager" ref="transactionManager"/>
@@ -783,26 +778,26 @@ class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
     </util:set>
   </property>
 </bean>
--------------------------------------------------------------------------------
+-----
 
-### Camel Sql Starter
+=== Camel Sql Starter
 
 A starter module is available to spring-boot users. When using the starter,
 the `DataSource` can be directly configured using spring-boot properties.
 
-[source]
-------------------------------------------------------
+[source,text]
+-----
 # Example for a mysql datasource
 spring.datasource.url=jdbc:mysql://localhost/test
 spring.datasource.username=dbuser
 spring.datasource.password=dbpass
 spring.datasource.driver-class-name=com.mysql.jdbc.Driver
-------------------------------------------------------
+----
 
 To use this feature, add the following dependencies to your spring boot pom.xml file:
 
 [source,xml]
-------------------------------------------------------
+----
 <dependency>
     <groupId>org.apache.camel</groupId>
     <artifactId>camel-sql-starter</artifactId>
@@ -814,17 +809,7 @@ To use this feature, add the following dependencies to your spring boot pom.xml
     <artifactId>spring-boot-starter-jdbc</artifactId>
     <version>${spring-boot-version}</version>
 </dependency>
-------------------------------------------------------
+----
 
 You should also include the specific database driver, if needed.
 
-### See Also
-
-* Configuring Camel
-* Component
-* Endpoint
-* Getting Started
-
-SQL Stored Procedure
-
-<<jdbc-component,JDBC>>
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
index 53711cc..8f6c657 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
@@ -80,7 +80,7 @@ public abstract class DefaultSqlEndpoint extends DefaultPollingEndpoint {
     private boolean breakBatchOnConsumeFail;
     @UriParam(defaultValue = "true", description = "Whether to allow using named parameters in the queries.")
     private boolean allowNamedParameters = true;
-    @UriParam(label = "producer,advanced",
+    @UriParam(label = "advanced",
             description = "If enabled then the populateStatement method from org.apache.camel.component.sql.SqlPrepareStatementStrategy is always invoked, "
                     + "also if there is no expected parameters to be prepared. When this is false then the populateStatement is only invoked if there is 1"
                     + " or more expected parameters to be set; for example this avoids reading the message body/headers for SQL queries with no parameters.")
@@ -99,7 +99,7 @@ public abstract class DefaultSqlEndpoint extends DefaultPollingEndpoint {
     private SqlOutputType outputType = SqlOutputType.SelectList;
     @UriParam(description = "Specify the full package and class name to use as conversion when outputType=SelectOne.")
     private String outputClass;
-    @UriParam(label = "producer,advanced", description = "If set greater than zero, then Camel will use this count value of parameters to replace instead of"
+    @UriParam(label = "advanced", description = "If set greater than zero, then Camel will use this count value of parameters to replace instead of"
             + " querying via JDBC metadata API. This is useful if the JDBC vendor could not return correct parameters count, then user may override instead.")
     private int parametersCount;
     @UriParam(label = "producer", description = "If set, will ignore the results of the SQL query and use the existing IN message as the OUT message for the continuation of processing")
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
index 20b031c..48046d5 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.sql;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
@@ -56,6 +57,8 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
     private boolean routeEmptyResultSet;
     private int expectedUpdateCount = -1;
     private boolean breakBatchOnConsumeFail;
+    private int parametersCount;
+    private boolean alwaysPopulateStatement;
 
     private static final class DataHolder {
         private Exchange exchange;
@@ -106,7 +109,8 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
         shutdownRunningTask = null;
         pendingExchanges = 0;
 
-        final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(resolvedQuery, getEndpoint().isAllowNamedParameters(), null);
+        final Exchange dummy = getEndpoint().createExchange();
+        final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(resolvedQuery, getEndpoint().isAllowNamedParameters(), dummy);
 
         log.trace("poll: {}", preparedQuery);
         final PreparedStatementCallback<Integer> callback = new PreparedStatementCallback<Integer>() {
@@ -114,6 +118,14 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
             public Integer doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException {
                 Queue<DataHolder> answer = new LinkedList<>();
 
+                int expected = parametersCount > 0 ? parametersCount : ps.getParameterMetaData().getParameterCount();
+
+                // only populate if really needed
+                if (alwaysPopulateStatement || expected > 0) {
+                    Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(resolvedQuery, preparedQuery, expected, dummy, null);
+                    sqlPrepareStatementStrategy.populateStatement(ps, i, expected);
+                }
+
                 log.debug("Executing query: {}", preparedQuery);
                 ResultSet rs = ps.executeQuery();
                 SqlOutputType outputType = getEndpoint().getOutputType();
@@ -378,4 +390,12 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
             jdbcTemplate.setMaxRows(maxMessagesPerPoll);
         }
     }
+
+    public void setParametersCount(int parametersCount) {
+        this.parametersCount = parametersCount;
+    }
+
+    public void setAlwaysPopulateStatement(boolean alwaysPopulateStatement) {
+        this.alwaysPopulateStatement = alwaysPopulateStatement;
+    }
 }
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
index fd8cbdd..f741245 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
@@ -56,6 +56,8 @@ public class SqlEndpoint extends DefaultSqlEndpoint {
         consumer.setExpectedUpdateCount(getExpectedUpdateCount());
         consumer.setUseIterator(isUseIterator());
         consumer.setRouteEmptyResultSet(isRouteEmptyResultSet());
+        consumer.setParametersCount(getParametersCount());
+        consumer.setAlwaysPopulateStatement(isAlwaysPopulateStatement());
         configureConsumer(consumer);
         return consumer;
     }
diff --git a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDynamicParameterTest.java b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDynamicParameterTest.java
new file mode 100644
index 0000000..bf1c7a4
--- /dev/null
+++ b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDynamicParameterTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+public class SqlConsumerDynamicParameterTest extends CamelTestSupport {
+
+    EmbeddedDatabase db;
+    MyIdGenerator idGenerator = new MyIdGenerator();
+
+    @Before
+    public void setUp() throws Exception {
+        db = new EmbeddedDatabaseBuilder()
+            .setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
+
+        super.setUp();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+
+        db.shutdown();
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("myIdGenerator", idGenerator);
+        return jndi;
+    }
+
+    @Test
+    public void testDynamicConsume() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(3);
+
+        context.startRoute("foo");
+
+        assertMockEndpointsSatisfied();
+
+        List<Exchange> exchanges = mock.getReceivedExchanges();
+
+        assertEquals(1, exchanges.get(0).getIn().getBody(Map.class).get("ID"));
+        assertEquals("Camel", exchanges.get(0).getIn().getBody(Map.class).get("PROJECT"));
+        assertEquals(2, exchanges.get(1).getIn().getBody(Map.class).get("ID"));
+        assertEquals("AMQ", exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
+        assertEquals(3, exchanges.get(2).getIn().getBody(Map.class).get("ID"));
+        assertEquals("Linux", exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));
+
+        // and the bean id should be > 1
+        assertTrue("Id counter should be > 1", idGenerator.getId() > 1);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                getContext().getComponent("sql", SqlComponent.class).setDataSource(db);
+
+                from("sql:select * from projects where id = :#${bean:myIdGenerator.nextId}?consumer.initialDelay=0&consumer.delay=50")
+                    .routeId("foo").noAutoStartup()
+                    .to("mock:result");
+            }
+        };
+    }
+
+    public static class MyIdGenerator {
+
+        private int id = 1;
+
+        public int nextId() {
+            return id++;
+        }
+
+        public int getId() {
+            return id;
+        }
+    }
+}