You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/06/03 16:01:28 UTC

[2/2] git commit: CAMEL-6418: Added separator option. Fixed and improved when using String as body with # placeholders to honor qutes and use new separator option.

CAMEL-6418: Added separator option. Fixed and improved when using String as body with # placeholders to honor qutes and use new separator option.

Conflicts:
	components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6f10a76b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6f10a76b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6f10a76b

Branch: refs/heads/camel-2.11.x
Commit: 6f10a76bcd28939dddf96a231d35a492f68ade79
Parents: 51501fd
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jun 3 15:48:00 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jun 3 16:01:09 2013 +0200

----------------------------------------------------------------------
 .../sql/DefaultSqlPrepareStatementStrategy.java    |   24 ++++-
 .../sql/DefaultSqlProcessingStrategy.java          |   13 ++-
 .../apache/camel/component/sql/SqlConsumer.java    |   13 ++-
 .../apache/camel/component/sql/SqlEndpoint.java    |   20 +++-
 .../apache/camel/component/sql/SqlProducer.java    |   15 ++-
 .../component/sql/SqlProducerSeparatorTest.java    |   79 +++++++++++++++
 6 files changed, 144 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/6f10a76b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlPrepareStatementStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlPrepareStatementStrategy.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlPrepareStatementStrategy.java
index 0540c1d..5928f64 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlPrepareStatementStrategy.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlPrepareStatementStrategy.java
@@ -18,13 +18,16 @@ package org.apache.camel.component.sql;
 
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeExchangeException;
+import org.apache.camel.util.StringQuoteHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,6 +37,15 @@ import org.slf4j.LoggerFactory;
 public class DefaultSqlPrepareStatementStrategy implements SqlPrepareStatementStrategy {
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultSqlPrepareStatementStrategy.class);
+    private final char separator;
+
+    public DefaultSqlPrepareStatementStrategy() {
+        this(',');
+    }
+
+    public DefaultSqlPrepareStatementStrategy(char separator) {
+        this.separator = separator;
+    }
 
     @Override
     public String prepareQuery(String query, boolean allowNamedParameters) throws SQLException {
@@ -111,8 +123,16 @@ public class DefaultSqlPrepareStatementStrategy implements SqlPrepareStatementSt
 
 
         } else {
-            // just use a regular iterator
-            return exchange.getContext().getTypeConverter().convertTo(Iterator.class, value);
+            // is the body a String
+            if (value instanceof String) {
+                // if the body is a String then honor quotes etc.
+                String[] tokens = StringQuoteHelper.splitSafeQuote((String)value, separator, true);
+                List<String> list = Arrays.asList(tokens);
+                return list.iterator();
+            } else {
+                // just use a regular iterator
+                return exchange.getContext().getTypeConverter().convertTo(Iterator.class, value);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/6f10a76b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
index 787c3d6..d641c0f 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
@@ -32,19 +32,24 @@ import org.springframework.jdbc.core.PreparedStatementCallback;
 public class DefaultSqlProcessingStrategy implements SqlProcessingStrategy {
 
     private static final Logger LOG = LoggerFactory.getLogger(DefaultSqlProcessingStrategy.class);
+    private final SqlPrepareStatementStrategy sqlPrepareStatementStrategy;
+
+    public DefaultSqlProcessingStrategy(SqlPrepareStatementStrategy sqlPrepareStatementStrategy) {
+        this.sqlPrepareStatementStrategy = sqlPrepareStatementStrategy;
+    }
 
     @Override
     public int commit(final SqlEndpoint endpoint, final Exchange exchange, final Object data, final JdbcTemplate jdbcTemplate, final String query) throws Exception {
 
-        final String preparedQuery = endpoint.getPrepareStatementStrategy().prepareQuery(query, endpoint.isAllowNamedParameters());
+        final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(query, endpoint.isAllowNamedParameters());
 
         return jdbcTemplate.execute(preparedQuery, new PreparedStatementCallback<Integer>() {
             public Integer doInPreparedStatement(PreparedStatement ps) throws SQLException {
                 int expected = ps.getParameterMetaData().getParameterCount();
 
-                Iterator<?> iterator = endpoint.getPrepareStatementStrategy().createPopulateIterator(query, preparedQuery, expected, exchange, data);
+                Iterator<?> iterator = sqlPrepareStatementStrategy.createPopulateIterator(query, preparedQuery, expected, exchange, data);
                 if (iterator != null) {
-                    endpoint.getPrepareStatementStrategy().populateStatement(ps, iterator, expected);
+                    sqlPrepareStatementStrategy.populateStatement(ps, iterator, expected);
                     LOG.trace("Execute query {}", query);
                     ps.execute();
 
@@ -62,7 +67,7 @@ public class DefaultSqlProcessingStrategy implements SqlProcessingStrategy {
 
     @Override
     public int commitBatchComplete(final SqlEndpoint endpoint, final JdbcTemplate jdbcTemplate, final String query) throws Exception {
-        final String preparedQuery = endpoint.getPrepareStatementStrategy().prepareQuery(query, endpoint.isAllowNamedParameters());
+        final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(query, endpoint.isAllowNamedParameters());
 
         return jdbcTemplate.execute(preparedQuery, new PreparedStatementCallback<Integer>() {
             public Integer doInPreparedStatement(PreparedStatement ps) throws SQLException {

http://git-wip-us.apache.org/repos/asf/camel/blob/6f10a76b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
index 89643be..5335f76 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
@@ -44,6 +44,8 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
 
     private final String query;
     private final JdbcTemplate jdbcTemplate;
+    private final SqlPrepareStatementStrategy sqlPrepareStatementStrategy;
+    private final SqlProcessingStrategy sqlProcessingStrategy;
 
     private String onConsume;
     private String onConsumeFailed;
@@ -61,10 +63,13 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
         }
     }
 
-    public SqlConsumer(SqlEndpoint endpoint, Processor processor, JdbcTemplate jdbcTemplate, String query) {
+    public SqlConsumer(SqlEndpoint endpoint, Processor processor, JdbcTemplate jdbcTemplate, String query,
+                       SqlPrepareStatementStrategy sqlPrepareStatementStrategy, SqlProcessingStrategy sqlProcessingStrategy) {
         super(endpoint, processor);
         this.jdbcTemplate = jdbcTemplate;
         this.query = query;
+        this.sqlPrepareStatementStrategy = sqlPrepareStatementStrategy;
+        this.sqlProcessingStrategy = sqlProcessingStrategy;
     }
 
     @Override
@@ -78,7 +83,7 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
         shutdownRunningTask = null;
         pendingExchanges = 0;
 
-        final String preparedQuery = getEndpoint().getPrepareStatementStrategy().prepareQuery(query, getEndpoint().isAllowNamedParameters());
+        final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(query, getEndpoint().isAllowNamedParameters());
 
         Integer messagePolled = jdbcTemplate.execute(preparedQuery, new PreparedStatementCallback<Integer>() {
             @Override
@@ -171,7 +176,7 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
             try {
                 // we can only run on consume if there was data
                 if (data != null && sql != null) {
-                    int updateCount = getEndpoint().getProcessingStrategy().commit(getEndpoint(), exchange, data, jdbcTemplate, sql);
+                    int updateCount = sqlProcessingStrategy.commit(getEndpoint(), exchange, data, jdbcTemplate, sql);
                     if (expectedUpdateCount > -1 && updateCount != expectedUpdateCount) {
                         String msg = "Expected update count " + expectedUpdateCount + " but was " + updateCount + " executing query: " + sql;
                         throw new SQLException(msg);
@@ -188,7 +193,7 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
 
         try {
             if (onConsumeBatchComplete != null) {
-                int updateCount = getEndpoint().getProcessingStrategy().commitBatchComplete(getEndpoint(), jdbcTemplate, onConsumeBatchComplete);
+                int updateCount = sqlProcessingStrategy.commitBatchComplete(getEndpoint(), jdbcTemplate, onConsumeBatchComplete);
                 log.debug("onConsumeBatchComplete update count {}", updateCount);
             }
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/camel/blob/6f10a76b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
index 06249c8..70c70e4 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
@@ -34,13 +34,14 @@ public class SqlEndpoint extends DefaultPollingEndpoint {
     private String query;
     private boolean batch;
     private int maxMessagesPerPoll;
-    private SqlProcessingStrategy processingStrategy = new DefaultSqlProcessingStrategy();
-    private SqlPrepareStatementStrategy prepareStatementStrategy = new DefaultSqlPrepareStatementStrategy();
+    private SqlProcessingStrategy processingStrategy;
+    private SqlPrepareStatementStrategy prepareStatementStrategy;
     private String onConsume;
     private String onConsumeFailed;
     private String onConsumeBatchComplete;
     private boolean allowNamedParameters = true;
     private boolean alwaysPopulateStatement;
+    private char separator = ',';
 
     public SqlEndpoint() {
     }
@@ -52,7 +53,9 @@ public class SqlEndpoint extends DefaultPollingEndpoint {
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
-        SqlConsumer consumer = new SqlConsumer(this, processor, jdbcTemplate, query);
+        SqlPrepareStatementStrategy prepareStrategy = prepareStatementStrategy != null ? prepareStatementStrategy : new DefaultSqlPrepareStatementStrategy(separator);
+        SqlProcessingStrategy proStrategy = processingStrategy != null ? processingStrategy : new DefaultSqlProcessingStrategy(prepareStrategy);
+        SqlConsumer consumer = new SqlConsumer(this, processor, jdbcTemplate, query, prepareStrategy, proStrategy);
         consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
         consumer.setOnConsume(getOnConsume());
         consumer.setOnConsumeFailed(getOnConsumeFailed());
@@ -62,7 +65,8 @@ public class SqlEndpoint extends DefaultPollingEndpoint {
     }
 
     public Producer createProducer() throws Exception {
-        return new SqlProducer(this, query, jdbcTemplate, batch, alwaysPopulateStatement);
+        SqlPrepareStatementStrategy prepareStrategy = prepareStatementStrategy != null ? prepareStatementStrategy : new DefaultSqlPrepareStatementStrategy(separator);
+        return new SqlProducer(this, query, jdbcTemplate, prepareStrategy, batch, alwaysPopulateStatement);
     }
 
     public boolean isSingleton() {
@@ -157,6 +161,14 @@ public class SqlEndpoint extends DefaultPollingEndpoint {
         this.alwaysPopulateStatement = alwaysPopulateStatement;
     }
 
+    public char getSeparator() {
+        return separator;
+    }
+
+    public void setSeparator(char separator) {
+        this.separator = separator;
+    }
+
     @Override
     protected String createEndpointUri() {
         // Make sure it's properly encoded

http://git-wip-us.apache.org/repos/asf/camel/blob/6f10a76b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
index 1fa6864..97ff150 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
@@ -34,10 +34,13 @@ public class SqlProducer extends DefaultProducer {
     private JdbcTemplate jdbcTemplate;
     private boolean batch;
     private boolean alwaysPopulateStatement;
+    private SqlPrepareStatementStrategy sqlPrepareStatementStrategy;
 
-    public SqlProducer(SqlEndpoint endpoint, String query, JdbcTemplate jdbcTemplate, boolean batch, boolean alwaysPopulateStatement) {
+    public SqlProducer(SqlEndpoint endpoint, String query, JdbcTemplate jdbcTemplate, SqlPrepareStatementStrategy sqlPrepareStatementStrategy,
+                       boolean batch, boolean alwaysPopulateStatement) {
         super(endpoint);
         this.jdbcTemplate = jdbcTemplate;
+        this.sqlPrepareStatementStrategy = sqlPrepareStatementStrategy;
         this.query = query;
         this.batch = batch;
         this.alwaysPopulateStatement = alwaysPopulateStatement;
@@ -52,7 +55,7 @@ public class SqlProducer extends DefaultProducer {
         String queryHeader = exchange.getIn().getHeader(SqlConstants.SQL_QUERY, String.class);
 
         final String sql = queryHeader != null ? queryHeader : query;
-        final String preparedQuery = getEndpoint().getPrepareStatementStrategy().prepareQuery(sql, getEndpoint().isAllowNamedParameters());
+        final String preparedQuery = sqlPrepareStatementStrategy.prepareQuery(sql, getEndpoint().isAllowNamedParameters());
 
         jdbcTemplate.execute(preparedQuery, new PreparedStatementCallback<Map<?, ?>>() {
             public Map<?, ?> doInPreparedStatement(PreparedStatement ps) throws SQLException {
@@ -65,13 +68,13 @@ public class SqlProducer extends DefaultProducer {
                         Iterator<?> iterator = exchange.getIn().getBody(Iterator.class);
                         while (iterator != null && iterator.hasNext()) {
                             Object value = iterator.next();
-                            Iterator<?> i = getEndpoint().getPrepareStatementStrategy().createPopulateIterator(sql, preparedQuery, expected, exchange, value);
-                            getEndpoint().getPrepareStatementStrategy().populateStatement(ps, i, expected);
+                            Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, exchange, value);
+                            sqlPrepareStatementStrategy.populateStatement(ps, i, expected);
                             ps.addBatch();
                         }
                     } else {
-                        Iterator<?> i = getEndpoint().getPrepareStatementStrategy().createPopulateIterator(sql, preparedQuery, expected, exchange, exchange.getIn().getBody());
-                        getEndpoint().getPrepareStatementStrategy().populateStatement(ps, i, expected);
+                        Iterator<?> i = sqlPrepareStatementStrategy.createPopulateIterator(sql, preparedQuery, expected, exchange, exchange.getIn().getBody());
+                        sqlPrepareStatementStrategy.populateStatement(ps, i, expected);
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/6f10a76b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerSeparatorTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerSeparatorTest.java b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerSeparatorTest.java
new file mode 100755
index 0000000..3182cd1
--- /dev/null
+++ b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerSeparatorTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+/**
+ * @version 
+ */
+public class SqlProducerSeparatorTest extends CamelTestSupport {
+
+    private EmbeddedDatabase db;
+    private JdbcTemplate jdbcTemplate;
+
+    @Before
+    public void setUp() throws Exception {
+        db = new EmbeddedDatabaseBuilder()
+            .setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
+
+        jdbcTemplate = new JdbcTemplate(db);
+
+        super.setUp();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+        
+        db.shutdown();
+    }
+
+    @Test
+    public void testSeparator() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+
+        template.sendBody("direct:start", "4;'Food, Inc';'LGPL'");
+
+        mock.assertIsSatisfied();
+
+        assertEquals(4, jdbcTemplate.queryForInt("select count(*) from projects"));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                getContext().getComponent("sql", SqlComponent.class).setDataSource(db);
+
+                from("direct:start")
+                    .to("sql:insert into projects (id, project, license) values (#, #, #)?separator=;")
+                    .to("mock:result");
+            }
+        };
+    }
+}