You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/10/05 11:50:29 UTC

[04/10] camel git commit: CAMEL-9162: camel-elsql component

CAMEL-9162: camel-elsql component


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0b71c42f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0b71c42f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0b71c42f

Branch: refs/heads/master
Commit: 0b71c42f2c2e1704fc61ca2bcd7385ef0ff84677
Parents: f6cefc7
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Oct 5 08:21:20 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Oct 5 10:54:54 2015 +0200

----------------------------------------------------------------------
 .../camel/component/elsql/ElsqlConsumer.java    |   4 +-
 .../camel/component/elsql/ElsqlEndpoint.java    |   6 +-
 .../camel/component/elsql/ElsqlProducer.java    |   3 +-
 .../camel/component/sql/DefaultSqlEndpoint.java | 437 +++++++++++++++++++
 .../apache/camel/component/sql/SqlConsumer.java |   2 +-
 .../apache/camel/component/sql/SqlEndpoint.java | 428 +-----------------
 6 files changed, 456 insertions(+), 424 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0b71c42f/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlConsumer.java b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlConsumer.java
index 0f1c6d7..9459241 100644
--- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlConsumer.java
+++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlConsumer.java
@@ -17,15 +17,15 @@
 package org.apache.camel.component.elsql;
 
 import org.apache.camel.Processor;
+import org.apache.camel.component.sql.DefaultSqlEndpoint;
 import org.apache.camel.component.sql.SqlConsumer;
-import org.apache.camel.component.sql.SqlEndpoint;
 import org.apache.camel.component.sql.SqlPrepareStatementStrategy;
 import org.apache.camel.component.sql.SqlProcessingStrategy;
 import org.springframework.jdbc.core.JdbcTemplate;
 
 public class ElsqlConsumer extends SqlConsumer {
 
-    public ElsqlConsumer(SqlEndpoint endpoint, Processor processor, JdbcTemplate jdbcTemplate, String query,
+    public ElsqlConsumer(DefaultSqlEndpoint endpoint, Processor processor, JdbcTemplate jdbcTemplate, String query,
                          SqlPrepareStatementStrategy sqlPrepareStatementStrategy, SqlProcessingStrategy sqlProcessingStrategy) {
         super(endpoint, processor, jdbcTemplate, query, sqlPrepareStatementStrategy, sqlProcessingStrategy);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/0b71c42f/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
index 352dc37..a07b93e 100644
--- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
+++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlEndpoint.java
@@ -24,7 +24,7 @@ import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.component.sql.SqlEndpoint;
+import org.apache.camel.component.sql.DefaultSqlEndpoint;
 import org.apache.camel.component.sql.SqlPrepareStatementStrategy;
 import org.apache.camel.component.sql.SqlProcessingStrategy;
 import org.apache.camel.spi.Metadata;
@@ -37,7 +37,7 @@ import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
 
 @UriEndpoint(scheme = "elsql", title = "SQL", syntax = "elsql:elsqlName:resourceUri", consumerClass = ElsqlConsumer.class, label = "database,sql")
-public class ElsqlEndpoint extends SqlEndpoint {
+public class ElsqlEndpoint extends DefaultSqlEndpoint {
 
     private volatile ElSql elSql;
     private NamedParameterJdbcTemplate namedJdbcTemplate;
@@ -51,7 +51,7 @@ public class ElsqlEndpoint extends SqlEndpoint {
     private ElSqlConfig elSqlConfig;
 
     public ElsqlEndpoint(String uri, Component component, NamedParameterJdbcTemplate namedJdbcTemplate, String elsqlName, String resourceUri) {
-        super(uri, component, null, null);
+        super(uri, component, null);
         this.elsqlName = elsqlName;
         this.resourceUri = resourceUri;
         this.namedJdbcTemplate = namedJdbcTemplate;

http://git-wip-us.apache.org/repos/asf/camel/blob/0b71c42f/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
index fa3f5df..4353d9a 100644
--- a/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
+++ b/components/camel-elsql/src/main/java/org/apache/camel/component/elsql/ElsqlProducer.java
@@ -25,7 +25,6 @@ import com.opengamma.elsql.ElSql;
 import com.opengamma.elsql.SpringSqlParams;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.sql.SqlConstants;
-import org.apache.camel.component.sql.SqlEndpoint;
 import org.apache.camel.component.sql.SqlOutputType;
 import org.apache.camel.impl.DefaultProducer;
 import org.springframework.dao.DataAccessException;
@@ -41,7 +40,7 @@ public class ElsqlProducer extends DefaultProducer {
     private final String elSqlName;
     private final NamedParameterJdbcTemplate jdbcTemplate;
 
-    public ElsqlProducer(SqlEndpoint endpoint, ElSql elSql, String elSqlName, NamedParameterJdbcTemplate jdbcTemplate) {
+    public ElsqlProducer(ElsqlEndpoint endpoint, ElSql elSql, String elSqlName, NamedParameterJdbcTemplate jdbcTemplate) {
         super(endpoint);
         this.elSql = elSql;
         this.elSqlName = elSqlName;

http://git-wip-us.apache.org/repos/asf/camel/blob/0b71c42f/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
new file mode 100644
index 0000000..c67ecc1
--- /dev/null
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlEndpoint.java
@@ -0,0 +1,437 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import java.sql.ResultSet;
+import java.sql.SQLDataException;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import javax.sql.DataSource;
+
+import org.apache.camel.Component;
+import org.apache.camel.impl.DefaultPollingEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.springframework.jdbc.core.BeanPropertyRowMapper;
+import org.springframework.jdbc.core.ColumnMapRowMapper;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.RowMapper;
+import org.springframework.jdbc.core.RowMapperResultSetExtractor;
+
+/**
+ * Base class for SQL endpoints.
+ */
+public abstract class DefaultSqlEndpoint extends DefaultPollingEndpoint {
+    private JdbcTemplate jdbcTemplate;
+
+    @UriParam(description = "Sets the reference to a DataSource to lookup from the registry, to use for communicating with the database.")
+    @Deprecated
+    private String dataSourceRef;
+    @UriParam(description = "Sets the DataSource to use to communicate with the database.")
+    private DataSource dataSource;
+    @UriParam(label = "producer", description = "Enables or disables batch mode")
+    private boolean batch;
+    @UriParam(label = "consumer", description = "Sets the maximum number of messages to poll")
+    private int maxMessagesPerPoll;
+    @UriParam(label = "consumer,advanced",
+            description = "Allows to plugin to use a custom org.apache.camel.component.sql.SqlProcessingStrategy to execute queries when the consumer has processed the rows/batch.")
+    private SqlProcessingStrategy processingStrategy;
+    @UriParam(label = "advanced",
+            description = "Allows to plugin to use a custom org.apache.camel.component.sql.SqlPrepareStatementStrategy to control preparation of the query and prepared statement.")
+    private SqlPrepareStatementStrategy prepareStatementStrategy;
+    @UriParam(label = "consumer",
+            description = "After processing each row then this query can be executed, if the Exchange was processed successfully, for example to mark the row as processed. The query can have parameter.")
+    private String onConsume;
+    @UriParam(label = "consumer",
+            description = "After processing each row then this query can be executed, if the Exchange failed, for example to mark the row as failed. The query can have parameter.")
+    private String onConsumeFailed;
+    @UriParam(label = "consumer",
+            description = "After processing the entire batch, this query can be executed to bulk update rows etc. The query cannot have parameters.")
+    private String onConsumeBatchComplete;
+    @UriParam(label = "consumer", defaultValue = "true",
+            description = "Sets how resultset should be delivered to route. Indicates delivery as either a list or individual object. defaults to true.")
+    private boolean useIterator = true;
+    @UriParam(label = "consumer",
+            description = "Sets whether empty resultset should be allowed to be sent to the next hop. Defaults to false. So the empty resultset will be filtered out.")
+    private boolean routeEmptyResultSet;
+    @UriParam(label = "consumer", defaultValue = "-1", description = "Sets an expected update count to validate when using onConsume.")
+    private int expectedUpdateCount = -1;
+    @UriParam(label = "consumer", description = "Sets whether to break batch if onConsume failed.")
+    private boolean breakBatchOnConsumeFail;
+    @UriParam(defaultValue = "true", description = "Whether to allow using named parameters in the queries.")
+    private boolean allowNamedParameters = true;
+    @UriParam(label = "producer,advanced",
+            description = "If enabled then the populateStatement method from org.apache.camel.component.sql.SqlPrepareStatementStrategy is always invoked, "
+                    + "also if there is no expected parameters to be prepared. When this is false then the populateStatement is only invoked if there is 1"
+                    + " or more expected parameters to be set; for example this avoids reading the message body/headers for SQL queries with no parameters.")
+    private boolean alwaysPopulateStatement;
+    @UriParam(defaultValue = ",",
+            description = "The separator to use when parameter values is taken from message body (if the body is a String type), to be inserted at # placeholders."
+            + "Notice if you use named parameters, then a Map type is used instead. The default value is ,")
+    private char separator = ',';
+    @UriParam(defaultValue = "SelectList", description = "Make the output of consumer or producer to SelectList as List of Map, or SelectOne as single Java object in the following way:"
+            + "a) If the query has only single column, then that JDBC Column object is returned. (such as SELECT COUNT( * ) FROM PROJECT will return a Long object."
+            + "b) If the query has more than one column, then it will return a Map of that result."
+            + "c) If the outputClass is set, then it will convert the query result into an Java bean object by calling all the setters that match the column names."
+            + "It will assume your class has a default constructor to create instance with."
+            + "d) If the query resulted in more than one rows, it throws an non-unique result exception.")
+    private SqlOutputType outputType = SqlOutputType.SelectList;
+    @UriParam(description = "Specify the full package and class name to use as conversion when outputType=SelectOne.")
+    private String outputClass;
+    @UriParam(label = "producer,advanced", description = "If set greater than zero, then Camel will use this count value of parameters to replace instead of"
+            + " querying via JDBC metadata API. This is useful if the JDBC vendor could not return correct parameters count, then user may override instead.")
+    private int parametersCount;
+    @UriParam(label = "producer", description = "If set, will ignore the results of the SQL query and use the existing IN message as the OUT message for the continuation of processing")
+    private boolean noop;
+    @UriParam(description = "Store the query result in a header instead of the message body. By default, outputHeader == null and the query result is stored"
+            + " in the message body, any existing content in the message body is discarded. If outputHeader is set, the value is used as the name of the header"
+            + " to store the query result and the original message body is preserved.")
+    private String outputHeader;
+    @UriParam(label = "producer", description = "Whether to use the message body as the SQL and then headers for parameters. If this option is enabled then the SQL in the uri is not used.")
+    private boolean useMessageBodyForSql;
+
+    public DefaultSqlEndpoint() {
+    }
+
+    public DefaultSqlEndpoint(String uri, Component component, JdbcTemplate jdbcTemplate) {
+        super(uri, component);
+        this.jdbcTemplate = jdbcTemplate;
+    }
+
+    public boolean isSingleton() {
+        return true;
+    }
+
+    public JdbcTemplate getJdbcTemplate() {
+        return jdbcTemplate;
+    }
+
+    public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
+        this.jdbcTemplate = jdbcTemplate;
+    }
+
+    public boolean isBatch() {
+        return batch;
+    }
+
+    /**
+     * Enables or disables batch mode
+     */
+    public void setBatch(boolean batch) {
+        this.batch = batch;
+    }
+
+    public int getMaxMessagesPerPoll() {
+        return maxMessagesPerPoll;
+    }
+
+    /**
+     * Sets the maximum number of messages to poll
+     */
+    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+        this.maxMessagesPerPoll = maxMessagesPerPoll;
+    }
+
+    public SqlProcessingStrategy getProcessingStrategy() {
+        return processingStrategy;
+    }
+
+    /**
+     * Allows to plugin to use a custom org.apache.camel.component.sql.SqlProcessingStrategy to execute queries when the consumer has processed the rows/batch.
+     */
+    public void setProcessingStrategy(SqlProcessingStrategy processingStrategy) {
+        this.processingStrategy = processingStrategy;
+    }
+
+    public SqlPrepareStatementStrategy getPrepareStatementStrategy() {
+        return prepareStatementStrategy;
+    }
+
+    /**
+     * Allows to plugin to use a custom org.apache.camel.component.sql.SqlPrepareStatementStrategy to control preparation of the query and prepared statement.
+     */
+    public void setPrepareStatementStrategy(SqlPrepareStatementStrategy prepareStatementStrategy) {
+        this.prepareStatementStrategy = prepareStatementStrategy;
+    }
+
+    public String getOnConsume() {
+        return onConsume;
+    }
+
+    /**
+     * After processing each row then this query can be executed, if the Exchange was processed successfully, for example to mark the row as processed. The query can have parameter.
+     */
+    public void setOnConsume(String onConsume) {
+        this.onConsume = onConsume;
+    }
+
+    public String getOnConsumeFailed() {
+        return onConsumeFailed;
+    }
+
+    /**
+     * After processing each row then this query can be executed, if the Exchange failed, for example to mark the row as failed. The query can have parameter.
+     */
+    public void setOnConsumeFailed(String onConsumeFailed) {
+        this.onConsumeFailed = onConsumeFailed;
+    }
+
+    public String getOnConsumeBatchComplete() {
+        return onConsumeBatchComplete;
+    }
+
+    /**
+     * After processing the entire batch, this query can be executed to bulk update rows etc. The query cannot have parameters.
+     */
+    public void setOnConsumeBatchComplete(String onConsumeBatchComplete) {
+        this.onConsumeBatchComplete = onConsumeBatchComplete;
+    }
+
+    public boolean isAllowNamedParameters() {
+        return allowNamedParameters;
+    }
+
+    /**
+     * Whether to allow using named parameters in the queries.
+     */
+    public void setAllowNamedParameters(boolean allowNamedParameters) {
+        this.allowNamedParameters = allowNamedParameters;
+    }
+
+    public boolean isAlwaysPopulateStatement() {
+        return alwaysPopulateStatement;
+    }
+
+    /**
+     * If enabled then the populateStatement method from org.apache.camel.component.sql.SqlPrepareStatementStrategy is always invoked,
+     * also if there is no expected parameters to be prepared. When this is false then the populateStatement is only invoked if there
+     * is 1 or more expected parameters to be set; for example this avoids reading the message body/headers for SQL queries with no parameters.
+     */
+    public void setAlwaysPopulateStatement(boolean alwaysPopulateStatement) {
+        this.alwaysPopulateStatement = alwaysPopulateStatement;
+    }
+
+    public char getSeparator() {
+        return separator;
+    }
+
+    /**
+     * The separator to use when parameter values is taken from message body (if the body is a String type), to be inserted at # placeholders.
+     * Notice if you use named parameters, then a Map type is used instead.
+     * <p/>
+     * The default value is ,
+     */
+    public void setSeparator(char separator) {
+        this.separator = separator;
+    }
+
+    public SqlOutputType getOutputType() {
+        return outputType;
+    }
+
+    /**
+     * Make the output of consumer or producer to SelectList as List of Map, or SelectOne as single Java object in the following way:
+     * a) If the query has only single column, then that JDBC Column object is returned. (such as SELECT COUNT( * ) FROM PROJECT will return a Long object.
+     * b) If the query has more than one column, then it will return a Map of that result.
+     * c) If the outputClass is set, then it will convert the query result into an Java bean object by calling all the setters that match the column names. 
+     * It will assume your class has a default constructor to create instance with.
+     * d) If the query resulted in more than one rows, it throws an non-unique result exception.
+     */
+    public void setOutputType(SqlOutputType outputType) {
+        this.outputType = outputType;
+    }
+
+    public String getOutputClass() {
+        return outputClass;
+    }
+
+    /**
+     * Specify the full package and class name to use as conversion when outputType=SelectOne.
+     */
+    public void setOutputClass(String outputClass) {
+        this.outputClass = outputClass;
+    }
+
+    public int getParametersCount() {
+        return parametersCount;
+    }
+
+    /**
+     * If set greater than zero, then Camel will use this count value of parameters to replace instead of querying via JDBC metadata API.
+     * This is useful if the JDBC vendor could not return correct parameters count, then user may override instead.
+     */
+    public void setParametersCount(int parametersCount) {
+        this.parametersCount = parametersCount;
+    }
+
+    public boolean isNoop() {
+        return noop;
+    }
+
+    /**
+     * If set, will ignore the results of the SQL query and use the existing IN message as the OUT message for the continuation of processing
+     */
+    public void setNoop(boolean noop) {
+        this.noop = noop;
+    }
+
+    public String getOutputHeader() {
+        return outputHeader;
+    }
+
+    /**
+     * Store the query result in a header instead of the message body.
+     * By default, outputHeader == null and the query result is stored in the message body,
+     * any existing content in the message body is discarded.
+     * If outputHeader is set, the value is used as the name of the header to store the
+     * query result and the original message body is preserved.
+     */
+    public void setOutputHeader(String outputHeader) {
+        this.outputHeader = outputHeader;
+    }
+
+    public boolean isUseMessageBodyForSql() {
+        return useMessageBodyForSql;
+    }
+
+    /**
+     * Whether to use the message body as the SQL and then headers for parameters.
+     * <p/>
+     * If this option is enabled then the SQL in the uri is not used.
+     */
+    public void setUseMessageBodyForSql(boolean useMessageBodyForSql) {
+        this.useMessageBodyForSql = useMessageBodyForSql;
+    }
+
+    public String getDataSourceRef() {
+        return dataSourceRef;
+    }
+
+    /**
+     * Sets the reference to a DataSource to lookup from the registry, to use for communicating with the database.
+     */
+    public void setDataSourceRef(String dataSourceRef) {
+        this.dataSourceRef = dataSourceRef;
+    }
+
+    public DataSource getDataSource() {
+        return dataSource;
+    }
+
+    /**
+     * Sets the DataSource to use to communicate with the database.
+     */
+    public void setDataSource(DataSource dataSource) {
+        this.dataSource = dataSource;
+    }
+
+    public boolean isUseIterator() {
+        return useIterator;
+    }
+
+    /**
+     * Sets how resultset should be delivered to route. Indicates delivery as either a list or individual object. defaults to true.
+     */
+    public void setUseIterator(boolean useIterator) {
+        this.useIterator = useIterator;
+    }
+
+    public boolean isRouteEmptyResultSet() {
+        return routeEmptyResultSet;
+    }
+
+    /**
+     * Sets whether empty resultset should be allowed to be sent to the next hop.
+     * Defaults to false. So the empty resultset will be filtered out.
+     */
+    public void setRouteEmptyResultSet(boolean routeEmptyResultSet) {
+        this.routeEmptyResultSet = routeEmptyResultSet;
+    }
+
+    public int getExpectedUpdateCount() {
+        return expectedUpdateCount;
+    }
+
+    /**
+     * Sets an expected update count to validate when using onConsume.
+     */
+    public void setExpectedUpdateCount(int expectedUpdateCount) {
+        this.expectedUpdateCount = expectedUpdateCount;
+    }
+
+    public boolean isBreakBatchOnConsumeFail() {
+        return breakBatchOnConsumeFail;
+    }
+
+    /**
+     * Sets whether to break batch if onConsume failed.
+     */
+    public void setBreakBatchOnConsumeFail(boolean breakBatchOnConsumeFail) {
+        this.breakBatchOnConsumeFail = breakBatchOnConsumeFail;
+    }
+
+    @SuppressWarnings("unchecked")
+    public List<?> queryForList(ResultSet rs, boolean allowMapToClass) throws SQLException {
+        if (allowMapToClass && outputClass != null) {
+            Class<?> outputClazz = getCamelContext().getClassResolver().resolveClass(outputClass);
+            RowMapper rowMapper = new BeanPropertyRowMapper(outputClazz);
+            RowMapperResultSetExtractor<?> mapper = new RowMapperResultSetExtractor(rowMapper);
+            List<?> data = mapper.extractData(rs);
+            return data;
+        } else {
+            ColumnMapRowMapper rowMapper = new ColumnMapRowMapper();
+            RowMapperResultSetExtractor<Map<String, Object>> mapper = new RowMapperResultSetExtractor<Map<String, Object>>(rowMapper);
+            List<Map<String, Object>> data = mapper.extractData(rs);
+            return data;
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public Object queryForObject(ResultSet rs) throws SQLException {
+        Object result = null;
+        if (outputClass == null) {
+            RowMapper rowMapper = new ColumnMapRowMapper();
+            RowMapperResultSetExtractor<Map<String, Object>> mapper = new RowMapperResultSetExtractor<Map<String, Object>>(rowMapper);
+            List<Map<String, Object>> data = mapper.extractData(rs);
+            if (data.size() > 1) {
+                throw new SQLDataException("Query result not unique for outputType=SelectOne. Got " + data.size() +  " count instead.");
+            } else if (data.size() == 1) {
+                // Set content depend on number of column from query result
+                Map<String, Object> row = data.get(0);
+                if (row.size() == 1) {
+                    result = row.values().iterator().next();
+                } else {
+                    result = row;
+                }
+            }
+        } else {
+            Class<?> outputClzz = getCamelContext().getClassResolver().resolveClass(outputClass);
+            RowMapper rowMapper = new BeanPropertyRowMapper(outputClzz);
+            RowMapperResultSetExtractor<?> mapper = new RowMapperResultSetExtractor(rowMapper);
+            List<?> data = mapper.extractData(rs);
+            if (data.size() > 1) {
+                throw new SQLDataException("Query result not unique for outputType=SelectOne. Got " + data.size() +  " count instead.");
+            } else if (data.size() == 1) {
+                result = data.get(0);
+            }
+        }
+
+        // If data.size is zero, let result be null.
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0b71c42f/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
index 29c3e07..3774dc2 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
@@ -59,7 +59,7 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
         }
     }
 
-    public SqlConsumer(SqlEndpoint endpoint, Processor processor, JdbcTemplate jdbcTemplate, String query,
+    public SqlConsumer(DefaultSqlEndpoint endpoint, Processor processor, JdbcTemplate jdbcTemplate, String query,
                        SqlPrepareStatementStrategy sqlPrepareStatementStrategy, SqlProcessingStrategy sqlProcessingStrategy) {
         super(endpoint, processor);
         this.jdbcTemplate = jdbcTemplate;

http://git-wip-us.apache.org/repos/asf/camel/blob/0b71c42f/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
index 761c125..fdf049c 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
@@ -16,29 +16,15 @@
  */
 package org.apache.camel.component.sql;
 
-import java.sql.ResultSet;
-import java.sql.SQLDataException;
-import java.sql.SQLException;
-import java.util.List;
-import java.util.Map;
-
-import javax.sql.DataSource;
-
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.impl.DefaultPollingEndpoint;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
-import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 import org.apache.camel.util.UnsafeUriCharactersEncoder;
-import org.springframework.jdbc.core.BeanPropertyRowMapper;
-import org.springframework.jdbc.core.ColumnMapRowMapper;
 import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.jdbc.core.RowMapper;
-import org.springframework.jdbc.core.RowMapperResultSetExtractor;
 
 /**
  * SQL Endpoint. Endpoint URI should contain valid SQL statement, but instead of
@@ -46,91 +32,23 @@ import org.springframework.jdbc.core.RowMapperResultSetExtractor;
  * This is because in camel question mark has other meaning.
  */
 @UriEndpoint(scheme = "sql", title = "SQL", syntax = "sql:query", consumerClass = SqlConsumer.class, label = "database,sql")
-public class SqlEndpoint extends DefaultPollingEndpoint {
-    private JdbcTemplate jdbcTemplate;
+public class SqlEndpoint extends DefaultSqlEndpoint {
 
     @UriPath(description = "Sets the SQL query to perform") @Metadata(required = "true")
-
     private String query;
-    @UriParam(description = "Sets the reference to a DataSource to lookup from the registry, to use for communicating with the database.")
-    @Deprecated
-    private String dataSourceRef;
-    @UriParam(description = "Sets the DataSource to use to communicate with the database.")
-    private DataSource dataSource;
-    @UriParam(label = "producer", description = "Enables or disables batch mode")
-    private boolean batch;
-    @UriParam(label = "consumer", description = "Sets the maximum number of messages to poll")
-    private int maxMessagesPerPoll;
-    @UriParam(label = "consumer,advanced",
-            description = "Allows to plugin to use a custom org.apache.camel.component.sql.SqlProcessingStrategy to execute queries when the consumer has processed the rows/batch.")
-    private SqlProcessingStrategy processingStrategy;
-    @UriParam(label = "advanced",
-            description = "Allows to plugin to use a custom org.apache.camel.component.sql.SqlPrepareStatementStrategy to control preparation of the query and prepared statement.")
-    private SqlPrepareStatementStrategy prepareStatementStrategy;
-    @UriParam(label = "consumer",
-            description = "After processing each row then this query can be executed, if the Exchange was processed successfully, for example to mark the row as processed. The query can have parameter.")
-    private String onConsume;
-    @UriParam(label = "consumer",
-            description = "After processing each row then this query can be executed, if the Exchange failed, for example to mark the row as failed. The query can have parameter.")
-    private String onConsumeFailed;
-    @UriParam(label = "consumer",
-            description = "After processing the entire batch, this query can be executed to bulk update rows etc. The query cannot have parameters.")
-    private String onConsumeBatchComplete;
-    @UriParam(label = "consumer", defaultValue = "true",
-            description = "Sets how resultset should be delivered to route. Indicates delivery as either a list or individual object. defaults to true.")
-    private boolean useIterator = true;
-    @UriParam(label = "consumer",
-            description = "Sets whether empty resultset should be allowed to be sent to the next hop. Defaults to false. So the empty resultset will be filtered out.")
-    private boolean routeEmptyResultSet;
-    @UriParam(label = "consumer", defaultValue = "-1", description = "Sets an expected update count to validate when using onConsume.")
-    private int expectedUpdateCount = -1;
-    @UriParam(label = "consumer", description = "Sets whether to break batch if onConsume failed.")
-    private boolean breakBatchOnConsumeFail;
-    @UriParam(defaultValue = "true", description = "Whether to allow using named parameters in the queries.")
-    private boolean allowNamedParameters = true;
-    @UriParam(label = "producer,advanced",
-            description = "If enabled then the populateStatement method from org.apache.camel.component.sql.SqlPrepareStatementStrategy is always invoked, "
-                    + "also if there is no expected parameters to be prepared. When this is false then the populateStatement is only invoked if there is 1"
-                    + " or more expected parameters to be set; for example this avoids reading the message body/headers for SQL queries with no parameters.")
-    private boolean alwaysPopulateStatement;
-    @UriParam(defaultValue = ",",
-            description = "The separator to use when parameter values is taken from message body (if the body is a String type), to be inserted at # placeholders."
-            + "Notice if you use named parameters, then a Map type is used instead. The default value is ,")
-    private char separator = ',';
-    @UriParam(defaultValue = "SelectList", description = "Make the output of consumer or producer to SelectList as List of Map, or SelectOne as single Java object in the following way:"
-            + "a) If the query has only single column, then that JDBC Column object is returned. (such as SELECT COUNT( * ) FROM PROJECT will return a Long object."
-            + "b) If the query has more than one column, then it will return a Map of that result."
-            + "c) If the outputClass is set, then it will convert the query result into an Java bean object by calling all the setters that match the column names."
-            + "It will assume your class has a default constructor to create instance with."
-            + "d) If the query resulted in more than one rows, it throws an non-unique result exception.")
-    private SqlOutputType outputType = SqlOutputType.SelectList;
-    @UriParam(description = "Specify the full package and class name to use as conversion when outputType=SelectOne.")
-    private String outputClass;
-    @UriParam(label = "producer,advanced", description = "If set greater than zero, then Camel will use this count value of parameters to replace instead of"
-            + " querying via JDBC metadata API. This is useful if the JDBC vendor could not return correct parameters count, then user may override instead.")
-    private int parametersCount;
-    @UriParam(label = "producer", description = "If set, will ignore the results of the SQL query and use the existing IN message as the OUT message for the continuation of processing")
-    private boolean noop;
-    @UriParam(description = "Store the query result in a header instead of the message body. By default, outputHeader == null and the query result is stored"
-            + " in the message body, any existing content in the message body is discarded. If outputHeader is set, the value is used as the name of the header"
-            + " to store the query result and the original message body is preserved.")
-    private String outputHeader;
-    @UriParam(label = "producer", description = "Whether to use the message body as the SQL and then headers for parameters. If this option is enabled then the SQL in the uri is not used.")
-    private boolean useMessageBodyForSql;
 
     public SqlEndpoint() {
     }
 
     public SqlEndpoint(String uri, Component component, JdbcTemplate jdbcTemplate, String query) {
-        super(uri, component);
-        this.jdbcTemplate = jdbcTemplate;
+        super(uri, component, jdbcTemplate);
         this.query = query;
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
-        SqlPrepareStatementStrategy prepareStrategy = prepareStatementStrategy != null ? prepareStatementStrategy : new DefaultSqlPrepareStatementStrategy(separator);
-        SqlProcessingStrategy proStrategy = processingStrategy != null ? processingStrategy : new DefaultSqlProcessingStrategy(prepareStrategy);
-        SqlConsumer consumer = new SqlConsumer(this, processor, jdbcTemplate, query, prepareStrategy, proStrategy);
+        SqlPrepareStatementStrategy prepareStrategy = getPrepareStatementStrategy() != null ? getPrepareStatementStrategy() : new DefaultSqlPrepareStatementStrategy(getSeparator());
+        SqlProcessingStrategy proStrategy = getProcessingStrategy() != null ? getProcessingStrategy() : new DefaultSqlProcessingStrategy(prepareStrategy);
+        SqlConsumer consumer = new SqlConsumer(this, processor, getJdbcTemplate(), query, prepareStrategy, proStrategy);
         consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
         consumer.setOnConsume(getOnConsume());
         consumer.setOnConsumeFailed(getOnConsumeFailed());
@@ -144,22 +62,16 @@ public class SqlEndpoint extends DefaultPollingEndpoint {
     }
 
     public Producer createProducer() throws Exception {
-        SqlPrepareStatementStrategy prepareStrategy = prepareStatementStrategy != null ? prepareStatementStrategy : new DefaultSqlPrepareStatementStrategy(separator);
-        SqlProducer result = new SqlProducer(this, query, jdbcTemplate, prepareStrategy, batch, alwaysPopulateStatement, useMessageBodyForSql);
-        result.setParametersCount(parametersCount);
+        SqlPrepareStatementStrategy prepareStrategy = getPrepareStatementStrategy() != null ? getPrepareStatementStrategy() : new DefaultSqlPrepareStatementStrategy(getSeparator());
+        SqlProducer result = new SqlProducer(this, query, getJdbcTemplate(), prepareStrategy, isBatch(), isAlwaysPopulateStatement(), isUseMessageBodyForSql());
+        result.setParametersCount(getParametersCount());
         return result;
     }
 
-    public boolean isSingleton() {
-        return true;
-    }
-
-    public JdbcTemplate getJdbcTemplate() {
-        return jdbcTemplate;
-    }
-
-    public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
-        this.jdbcTemplate = jdbcTemplate;
+    @Override
+    protected String createEndpointUri() {
+        // Make sure it's properly encoded
+        return "sql:" + UnsafeUriCharactersEncoder.encode(query);
     }
 
     public String getQuery() {
@@ -173,320 +85,4 @@ public class SqlEndpoint extends DefaultPollingEndpoint {
         this.query = query;
     }
 
-    public boolean isBatch() {
-        return batch;
-    }
-
-    /**
-     * Enables or disables batch mode
-     */
-    public void setBatch(boolean batch) {
-        this.batch = batch;
-    }
-
-    public int getMaxMessagesPerPoll() {
-        return maxMessagesPerPoll;
-    }
-
-    /**
-     * Sets the maximum number of messages to poll
-     */
-    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
-        this.maxMessagesPerPoll = maxMessagesPerPoll;
-    }
-
-    public SqlProcessingStrategy getProcessingStrategy() {
-        return processingStrategy;
-    }
-
-    /**
-     * Allows to plugin to use a custom org.apache.camel.component.sql.SqlProcessingStrategy to execute queries when the consumer has processed the rows/batch.
-     */
-    public void setProcessingStrategy(SqlProcessingStrategy processingStrategy) {
-        this.processingStrategy = processingStrategy;
-    }
-
-    public SqlPrepareStatementStrategy getPrepareStatementStrategy() {
-        return prepareStatementStrategy;
-    }
-
-    /**
-     * Allows to plugin to use a custom org.apache.camel.component.sql.SqlPrepareStatementStrategy to control preparation of the query and prepared statement.
-     */
-    public void setPrepareStatementStrategy(SqlPrepareStatementStrategy prepareStatementStrategy) {
-        this.prepareStatementStrategy = prepareStatementStrategy;
-    }
-
-    public String getOnConsume() {
-        return onConsume;
-    }
-
-    /**
-     * After processing each row then this query can be executed, if the Exchange was processed successfully, for example to mark the row as processed. The query can have parameter.
-     */
-    public void setOnConsume(String onConsume) {
-        this.onConsume = onConsume;
-    }
-
-    public String getOnConsumeFailed() {
-        return onConsumeFailed;
-    }
-
-    /**
-     * After processing each row then this query can be executed, if the Exchange failed, for example to mark the row as failed. The query can have parameter.
-     */
-    public void setOnConsumeFailed(String onConsumeFailed) {
-        this.onConsumeFailed = onConsumeFailed;
-    }
-
-    public String getOnConsumeBatchComplete() {
-        return onConsumeBatchComplete;
-    }
-
-    /**
-     * After processing the entire batch, this query can be executed to bulk update rows etc. The query cannot have parameters.
-     */
-    public void setOnConsumeBatchComplete(String onConsumeBatchComplete) {
-        this.onConsumeBatchComplete = onConsumeBatchComplete;
-    }
-
-    public boolean isAllowNamedParameters() {
-        return allowNamedParameters;
-    }
-
-    /**
-     * Whether to allow using named parameters in the queries.
-     */
-    public void setAllowNamedParameters(boolean allowNamedParameters) {
-        this.allowNamedParameters = allowNamedParameters;
-    }
-
-    public boolean isAlwaysPopulateStatement() {
-        return alwaysPopulateStatement;
-    }
-
-    /**
-     * If enabled then the populateStatement method from org.apache.camel.component.sql.SqlPrepareStatementStrategy is always invoked,
-     * also if there is no expected parameters to be prepared. When this is false then the populateStatement is only invoked if there
-     * is 1 or more expected parameters to be set; for example this avoids reading the message body/headers for SQL queries with no parameters.
-     */
-    public void setAlwaysPopulateStatement(boolean alwaysPopulateStatement) {
-        this.alwaysPopulateStatement = alwaysPopulateStatement;
-    }
-
-    public char getSeparator() {
-        return separator;
-    }
-
-    /**
-     * The separator to use when parameter values is taken from message body (if the body is a String type), to be inserted at # placeholders.
-     * Notice if you use named parameters, then a Map type is used instead.
-     * <p/>
-     * The default value is ,
-     */
-    public void setSeparator(char separator) {
-        this.separator = separator;
-    }
-
-    public SqlOutputType getOutputType() {
-        return outputType;
-    }
-
-    /**
-     * Make the output of consumer or producer to SelectList as List of Map, or SelectOne as single Java object in the following way:
-     * a) If the query has only single column, then that JDBC Column object is returned. (such as SELECT COUNT( * ) FROM PROJECT will return a Long object.
-     * b) If the query has more than one column, then it will return a Map of that result.
-     * c) If the outputClass is set, then it will convert the query result into an Java bean object by calling all the setters that match the column names. 
-     * It will assume your class has a default constructor to create instance with.
-     * d) If the query resulted in more than one rows, it throws an non-unique result exception.
-     */
-    public void setOutputType(SqlOutputType outputType) {
-        this.outputType = outputType;
-    }
-
-    public String getOutputClass() {
-        return outputClass;
-    }
-
-    /**
-     * Specify the full package and class name to use as conversion when outputType=SelectOne.
-     */
-    public void setOutputClass(String outputClass) {
-        this.outputClass = outputClass;
-    }
-
-    public int getParametersCount() {
-        return parametersCount;
-    }
-
-    /**
-     * If set greater than zero, then Camel will use this count value of parameters to replace instead of querying via JDBC metadata API.
-     * This is useful if the JDBC vendor could not return correct parameters count, then user may override instead.
-     */
-    public void setParametersCount(int parametersCount) {
-        this.parametersCount = parametersCount;
-    }
-
-    public boolean isNoop() {
-        return noop;
-    }
-
-    /**
-     * If set, will ignore the results of the SQL query and use the existing IN message as the OUT message for the continuation of processing
-     */
-    public void setNoop(boolean noop) {
-        this.noop = noop;
-    }
-
-    public String getOutputHeader() {
-        return outputHeader;
-    }
-
-    /**
-     * Store the query result in a header instead of the message body.
-     * By default, outputHeader == null and the query result is stored in the message body,
-     * any existing content in the message body is discarded.
-     * If outputHeader is set, the value is used as the name of the header to store the
-     * query result and the original message body is preserved.
-     */
-    public void setOutputHeader(String outputHeader) {
-        this.outputHeader = outputHeader;
-    }
-
-    public boolean isUseMessageBodyForSql() {
-        return useMessageBodyForSql;
-    }
-
-    /**
-     * Whether to use the message body as the SQL and then headers for parameters.
-     * <p/>
-     * If this option is enabled then the SQL in the uri is not used.
-     */
-    public void setUseMessageBodyForSql(boolean useMessageBodyForSql) {
-        this.useMessageBodyForSql = useMessageBodyForSql;
-    }
-
-    public String getDataSourceRef() {
-        return dataSourceRef;
-    }
-
-    /**
-     * Sets the reference to a DataSource to lookup from the registry, to use for communicating with the database.
-     */
-    public void setDataSourceRef(String dataSourceRef) {
-        this.dataSourceRef = dataSourceRef;
-    }
-
-    public DataSource getDataSource() {
-        return dataSource;
-    }
-
-    /**
-     * Sets the DataSource to use to communicate with the database.
-     */
-    public void setDataSource(DataSource dataSource) {
-        this.dataSource = dataSource;
-    }
-
-    public boolean isUseIterator() {
-        return useIterator;
-    }
-
-    /**
-     * Sets how resultset should be delivered to route. Indicates delivery as either a list or individual object. defaults to true.
-     */
-    public void setUseIterator(boolean useIterator) {
-        this.useIterator = useIterator;
-    }
-
-    public boolean isRouteEmptyResultSet() {
-        return routeEmptyResultSet;
-    }
-
-    /**
-     * Sets whether empty resultset should be allowed to be sent to the next hop.
-     * Defaults to false. So the empty resultset will be filtered out.
-     */
-    public void setRouteEmptyResultSet(boolean routeEmptyResultSet) {
-        this.routeEmptyResultSet = routeEmptyResultSet;
-    }
-
-    public int getExpectedUpdateCount() {
-        return expectedUpdateCount;
-    }
-
-    /**
-     * Sets an expected update count to validate when using onConsume.
-     */
-    public void setExpectedUpdateCount(int expectedUpdateCount) {
-        this.expectedUpdateCount = expectedUpdateCount;
-    }
-
-    public boolean isBreakBatchOnConsumeFail() {
-        return breakBatchOnConsumeFail;
-    }
-
-    /**
-     * Sets whether to break batch if onConsume failed.
-     */
-    public void setBreakBatchOnConsumeFail(boolean breakBatchOnConsumeFail) {
-        this.breakBatchOnConsumeFail = breakBatchOnConsumeFail;
-    }
-
-    @Override
-    protected String createEndpointUri() {
-        // Make sure it's properly encoded
-        return "sql:" + UnsafeUriCharactersEncoder.encode(query);
-    }
-
-    @SuppressWarnings("unchecked")
-    public List<?> queryForList(ResultSet rs, boolean allowMapToClass) throws SQLException {
-        if (allowMapToClass && outputClass != null) {
-            Class<?> outputClazz = getCamelContext().getClassResolver().resolveClass(outputClass);
-            RowMapper rowMapper = new BeanPropertyRowMapper(outputClazz);
-            RowMapperResultSetExtractor<?> mapper = new RowMapperResultSetExtractor(rowMapper);
-            List<?> data = mapper.extractData(rs);
-            return data;
-        } else {
-            ColumnMapRowMapper rowMapper = new ColumnMapRowMapper();
-            RowMapperResultSetExtractor<Map<String, Object>> mapper = new RowMapperResultSetExtractor<Map<String, Object>>(rowMapper);
-            List<Map<String, Object>> data = mapper.extractData(rs);
-            return data;
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    public Object queryForObject(ResultSet rs) throws SQLException {
-        Object result = null;
-        if (outputClass == null) {
-            RowMapper rowMapper = new ColumnMapRowMapper();
-            RowMapperResultSetExtractor<Map<String, Object>> mapper = new RowMapperResultSetExtractor<Map<String, Object>>(rowMapper);
-            List<Map<String, Object>> data = mapper.extractData(rs);
-            if (data.size() > 1) {
-                throw new SQLDataException("Query result not unique for outputType=SelectOne. Got " + data.size() +  " count instead.");
-            } else if (data.size() == 1) {
-                // Set content depend on number of column from query result
-                Map<String, Object> row = data.get(0);
-                if (row.size() == 1) {
-                    result = row.values().iterator().next();
-                } else {
-                    result = row;
-                }
-            }
-        } else {
-            Class<?> outputClzz = getCamelContext().getClassResolver().resolveClass(outputClass);
-            RowMapper rowMapper = new BeanPropertyRowMapper(outputClzz);
-            RowMapperResultSetExtractor<?> mapper = new RowMapperResultSetExtractor(rowMapper);
-            List<?> data = mapper.extractData(rs);
-            if (data.size() > 1) {
-                throw new SQLDataException("Query result not unique for outputType=SelectOne. Got " + data.size() +  " count instead.");
-            } else if (data.size() == 1) {
-                result = data.get(0);
-            }
-        }
-
-        // If data.size is zero, let result be null.
-        return result;
-    }
-
 }