You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/06/03 16:01:28 UTC
[2/2] git commit: CAMEL-6418: Added separator option. Fixed and
improved when using String as body with # placeholders to honor qutes and use
new separator option.
CAMEL-6418: Added separator option. Fixed and improved when using String as body with # placeholders to honor qutes and use new separator option.
Conflicts:
components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6f10a76b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6f10a76b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6f10a76b
Branch: refs/heads/camel-2.11.x
Commit: 6f10a76bcd28939dddf96a231d35a492f68ade79
Parents: 51501fd
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jun 3 15:48:00 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jun 3 16:01:09 2013 +0200
----------------------------------------------------------------------
.../sql/DefaultSqlPrepareStatementStrategy.java | 24 ++++-
.../sql/DefaultSqlProcessingStrategy.java | 13 ++-
.../apache/camel/component/sql/SqlConsumer.java | 13 ++-
.../apache/camel/component/sql/SqlEndpoint.java | 20 +++-
.../apache/camel/component/sql/SqlProducer.java | 15 ++-
.../component/sql/SqlProducerSeparatorTest.java | 79 +++++++++++++++
6 files changed, 144 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/6f10a76b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlPrepareStatementStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlPrepareStatementStrategy.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlPrepareStatementStrategy.java
index 0540c1d..5928f64 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlPrepareStatementStrategy.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlPrepareStatementStrategy.java
@@ -18,13 +18,16 @@ package org.apache.camel.component.sql;
import java.sql.PreparedStatement;
import java.sql.SQLException;
+import java.util.Arrays;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeExchangeException;
+import org.apache.camel.util.StringQuoteHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,6 +37,15 @@ import org.slf4j.LoggerFactory;
public class DefaultSqlPrepareStatementStrategy implements SqlPrepareStatementStrategy {
private static final Logger LOG = LoggerFactory.getLogger(DefaultSqlPrepareStatementStrategy.class);
+ private final char separator;
+
+ public DefaultSqlPrepareStatementStrategy() {
+ this(',');
+ }
+
+ public DefaultSqlPrepareStatementStrategy(char separator) {
+ this.separator = separator;
+ }
@Override
public String prepareQuery(String query, boolean allowNamedParameters) throws SQLException {
@@ -111,8 +123,16 @@ public class DefaultSqlPrepareStatementStrategy implements SqlPrepareStatementSt
} else {
- // just use a regular iterator
- return exchange.getContext().getTypeConverter().convertTo(Iterator.class, value);
+ // is the body a String
+ if (value instanceof String) {
+ // if the body is a String then honor quotes etc.
+ String[] tokens = StringQuoteHelper.splitSafeQuote((String)value, separator, true);
+ List<String> list = Arrays.asList(tokens);
+ return list.iterator();
+ } else {
+ // just use a regular iterator
+ return exchange.getContext().getTypeConverter().convertTo(Iterator.class, value);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/6f10a76b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
index 787c3d6..d641c0f 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
@@ -32,19 +32,24 @@ import org.springframework.jdbc.core.PreparedStatementCallback;
public class DefaultSqlProcessingStrategy implements SqlProcessingStrategy {
private static final Logger LOG = LoggerFactory.getLogger(DefaultSqlProcessingStrategy.class);
+ private final SqlPrepareStatementStrategy sqlPrepareStatementStrategy;
+
+ public DefaultSqlProcessingStrategy(SqlPrepareStatementStrategy sqlPrepareStatementStrategy) {
+ this.sqlPrepareStatementStrategy = sqlPrepareStatementStrategy;
+ }
@Override
public int commit(final SqlEndpoint endpoint, final Exchange exchange, final Object data, final JdbcTemplate jdbcTemplate, final String query) throws Exception {
- final String preparedQuery = endpoint.getPrepareStatementStrategy().prepareQuery(query, endpoint.isAllowNamedParameters());
+ final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(query, endpoint.isAllowNamedParameters());
return jdbcTemplate.execute(preparedQuery, new PreparedStatementCallback<Integer>() {
public Integer doInPreparedStatement(PreparedStatement ps) throws SQLException {
int expected = ps.getParameterMetaData().getParameterCount();
- Iterator<?> iterator = endpoint.getPrepareStatementStrategy().createPopulateIterator(query, preparedQuery, expected, exchange, data);
+ Iterator<?> iterator = sqlPrepareStatementStrategy.createPopulateIterator(query, preparedQuery, expected, exchange, data);
if (iterator != null) {
- endpoint.getPrepareStatementStrategy().populateStatement(ps, iterator, expected);
+ sqlPrepareStatementStrategy.populateStatement(ps, iterator, expected);
LOG.trace("Execute query {}", query);
ps.execute();
@@ -62,7 +67,7 @@ public class DefaultSqlProcessingStrategy implements SqlProcessingStrategy {
@Override
public int commitBatchComplete(final SqlEndpoint endpoint, final JdbcTemplate jdbcTemplate, final String query) throws Exception {
- final String preparedQuery = endpoint.getPrepareStatementStrategy().prepareQuery(query, endpoint.isAllowNamedParameters());
+ final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(query, endpoint.isAllowNamedParameters());
return jdbcTemplate.execute(preparedQuery, new PreparedStatementCallback<Integer>() {
public Integer doInPreparedStatement(PreparedStatement ps) throws SQLException {
http://git-wip-us.apache.org/repos/asf/camel/blob/6f10a76b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
index 89643be..5335f76 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
@@ -44,6 +44,8 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
private final String query;
private final JdbcTemplate jdbcTemplate;
+ private final SqlPrepareStatementStrategy sqlPrepareStatementStrategy;
+ private final SqlProcessingStrategy sqlProcessingStrategy;
private String onConsume;
private String onConsumeFailed;
@@ -61,10 +63,13 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
}
}
- public SqlConsumer(SqlEndpoint endpoint, Processor processor, JdbcTemplate jdbcTemplate, String query) {
+ public SqlConsumer(SqlEndpoint endpoint, Processor processor, JdbcTemplate jdbcTemplate, String query,
+ SqlPrepareStatementStrategy sqlPrepareStatementStrategy, SqlProcessingStrategy sqlProcessingStrategy) {
super(endpoint, processor);
this.jdbcTemplate = jdbcTemplate;
this.query = query;
+ this.sqlPrepareStatementStrategy = sqlPrepareStatementStrategy;
+ this.sqlProcessingStrategy = sqlProcessingStrategy;
}
@Override
@@ -78,7 +83,7 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
shutdownRunningTask = null;
pendingExchanges = 0;
- final String preparedQuery = getEndpoint().getPrepareStatementStrategy().prepareQuery(query, getEndpoint().isAllowNamedParameters());
+ final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(query, getEndpoint().isAllowNamedParameters());
Integer messagePolled = jdbcTemplate.execute(preparedQuery, new PreparedStatementCallback<Integer>() {
@Override
@@ -171,7 +176,7 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
try {
// we can only run on consume if there was data
if (data != null && sql != null) {
- int updateCount = getEndpoint().getProcessingStrategy().commit(getEndpoint(), exchange, data, jdbcTemplate, sql);
+ int updateCount = sqlProcessingStrategy.commit(getEndpoint(), exchange, data, jdbcTemplate, sql);
if (expectedUpdateCount > -1 && updateCount != expectedUpdateCount) {
String msg = "Expected update count " + expectedUpdateCount + " but was " + updateCount + " executing query: " + sql;
throw new SQLException(msg);
@@ -188,7 +193,7 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
try {
if (onConsumeBatchComplete != null) {
- int updateCount = getEndpoint().getProcessingStrategy().commitBatchComplete(getEndpoint(), jdbcTemplate, onConsumeBatchComplete);
+ int updateCount = sqlProcessingStrategy.commitBatchComplete(getEndpoint(), jdbcTemplate, onConsumeBatchComplete);
log.debug("onConsumeBatchComplete update count {}", updateCount);
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/camel/blob/6f10a76b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
index 06249c8..70c70e4 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
@@ -34,13 +34,14 @@ public class SqlEndpoint extends DefaultPollingEndpoint {
private String query;
private boolean batch;
private int maxMessagesPerPoll;
- private SqlProcessingStrategy processingStrategy = new DefaultSqlProcessingStrategy();
- private SqlPrepareStatementStrategy prepareStatementStrategy = new DefaultSqlPrepareStatementStrategy();
+ private SqlProcessingStrategy processingStrategy;
+ private SqlPrepareStatementStrategy prepareStatementStrategy;
private String onConsume;
private String onConsumeFailed;
private String onConsumeBatchComplete;
private boolean allowNamedParameters = true;
private boolean alwaysPopulateStatement;
+ private char separator = ',';
public SqlEndpoint() {
}
@@ -52,7 +53,9 @@ public class SqlEndpoint extends DefaultPollingEndpoint {
}
public Consumer createConsumer(Processor processor) throws Exception {
- SqlConsumer consumer = new SqlConsumer(this, processor, jdbcTemplate, query);
+ SqlPrepareStatementStrategy prepareStrategy = prepareStatementStrategy != null ? prepareStatementStrategy : new DefaultSqlPrepareStatementStrategy(separator);
+ SqlProcessingStrategy proStrategy = processingStrategy != null ? processingStrategy : new DefaultSqlProcessingStrategy(prepareStrategy);
+ SqlConsumer consumer = new SqlConsumer(this, processor, jdbcTemplate, query, prepareStrategy, proStrategy);
consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
consumer.setOnConsume(getOnConsume());
consumer.setOnConsumeFailed(getOnConsumeFailed());
@@ -62,7 +65,8 @@ public class SqlEndpoint extends DefaultPollingEndpoint {
}
public Producer createProducer() throws Exception {
- return new SqlProducer(this, query, jdbcTemplate, batch, alwaysPopulateStatement);
+ SqlPrepareStatementStrategy prepareStrategy = prepareStatementStrategy != null ? prepareStatementStrategy : new DefaultSqlPrepareStatementStrategy(separator);
+ return new SqlProducer(this, query, jdbcTemplate, prepareStrategy, batch, alwaysPopulateStatement);
}
public boolean isSingleton() {
@@ -157,6 +161,14 @@ public class SqlEndpoint extends DefaultPollingEndpoint {
this.alwaysPopulateStatement = alwaysPopulateStatement;
}
+ public char getSeparator() {
+ return separator;
+ }
+
+ public void setSeparator(char separator) {
+ this.separator = separator;
+ }
+
@Override
protected String createEndpointUri() {
// Make sure it's properly encoded
http://git-wip-us.apache.org/repos/asf/camel/blob/6f10a76b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
index 1fa6864..97ff150 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
@@ -34,10 +34,13 @@ public class SqlProducer extends DefaultProducer {
private JdbcTemplate jdbcTemplate;
private boolean batch;
private boolean alwaysPopulateStatement;
+ private SqlPrepareStatementStrategy sqlPrepareStatementStrategy;
- public SqlProducer(SqlEndpoint endpoint, String query, JdbcTemplate jdbcTemplate, boolean batch, boolean alwaysPopulateStatement) {
+ public SqlProducer(SqlEndpoint endpoint, String query, JdbcTemplate jdbcTemplate, SqlPrepareStatementStrategy sqlPrepareStatementStrategy,
+ boolean batch, boolean alwaysPopulateStatement) {
super(endpoint);
this.jdbcTemplate = jdbcTemplate;
+ this.sqlPrepareStatementStrategy = sqlPrepareStatementStrategy;
this.query = query;
this.batch = batch;
this.alwaysPopulateStatement = alwaysPopulateStatement;
@@ -52,7 +55,7 @@ public class SqlProducer extends DefaultProducer {
String queryHeader = exchange.getIn().getHeader(SqlConstants.SQL_QUERY, String.class);
final String sql = queryHeader != null ? queryHeader : query;
- final String preparedQuery = getEndpoint().getPrepareStatementStrategy().prepareQuery(sql, getEndpoint().isAllowNamedParameters());
+ final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(sql, getEndpoint().isAllowNamedParameters());
jdbcTemplate.execute(preparedQuery, new PreparedStatementCallback<Map<?, ?>>() {
public Map<?, ?> doInPreparedStatement(PreparedStatement ps) throws SQLException {
@@ -65,13 +68,13 @@ public class SqlProducer extends DefaultProducer {
Iterator<?> iterator = exchange.getIn().getBody(Iterator.class);
while (iterator != null && iterator.hasNext()) {
Object value = iterator.next();
- Iterator<?> i = getEndpoint().getPrepareStatementStrategy().createPopulateIterator(sql, preparedQuery, expected, exchange, value);
- getEndpoint().getPrepareStatementStrategy().populateStatement(ps, i, expected);
+ Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, exchange, value);
+ sqlPrepareStatementStrategy.populateStatement(ps, i, expected);
ps.addBatch();
}
} else {
- Iterator<?> i = getEndpoint().getPrepareStatementStrategy().createPopulateIterator(sql, preparedQuery, expected, exchange, exchange.getIn().getBody());
- getEndpoint().getPrepareStatementStrategy().populateStatement(ps, i, expected);
+ Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, exchange, exchange.getIn().getBody());
+ sqlPrepareStatementStrategy.populateStatement(ps, i, expected);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/6f10a76b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerSeparatorTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerSeparatorTest.java b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerSeparatorTest.java
new file mode 100755
index 0000000..3182cd1
--- /dev/null
+++ b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerSeparatorTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+/**
+ * @version
+ */
+public class SqlProducerSeparatorTest extends CamelTestSupport {
+
+ private EmbeddedDatabase db;
+ private JdbcTemplate jdbcTemplate;
+
+ @Before
+ public void setUp() throws Exception {
+ db = new EmbeddedDatabaseBuilder()
+ .setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
+
+ jdbcTemplate = new JdbcTemplate(db);
+
+ super.setUp();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+
+ db.shutdown();
+ }
+
+ @Test
+ public void testSeparator() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+
+ template.sendBody("direct:start", "4;'Food, Inc';'LGPL'");
+
+ mock.assertIsSatisfied();
+
+ assertEquals(4, jdbcTemplate.queryForInt("select count(*) from projects"));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+ getContext().getComponent("sql", SqlComponent.class).setDataSource(db);
+
+ from("direct:start")
+ .to("sql:insert into projects (id, project, license) values (#, #, #)?separator=;")
+ .to("mock:result");
+ }
+ };
+ }
+}