You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/01/17 14:22:05 UTC

svn commit: r1434662 - in /camel/trunk/components/camel-sql/src: main/java/org/apache/camel/component/sql/ test/java/org/apache/camel/component/sql/ test/resources/

Author: davsclaus
Date: Thu Jan 17 13:22:05 2013
New Revision: 1434662

URL: http://svn.apache.org/viewvc?rev=1434662&view=rev
Log:
CAMEL-5976: camel-sql consumer can now do onConsume to delete row after processing etc.

Modified:
    camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
    camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
    camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
    camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
    camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
    camel/trunk/components/camel-sql/src/test/resources/log4j.properties

Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java?rev=1434662&r1=1434661&r2=1434662&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java (original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java Thu Jan 17 13:22:05 2013
@@ -19,7 +19,6 @@ package org.apache.camel.component.sql;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.Iterator;
-import java.util.Map;
 
 import org.apache.camel.Exchange;
 import org.slf4j.Logger;
@@ -35,75 +34,30 @@ public class DefaultSqlProcessingStrateg
     private static final Logger LOG = LoggerFactory.getLogger(DefaultSqlProcessingStrategy.class);
 
     @Override
-    public void commit(SqlEndpoint endpoint, final Exchange exchange, Object data, JdbcTemplate jdbcTemplate, final String query) throws Exception {
-        jdbcTemplate.execute(query, new PreparedStatementCallback<Map<?, ?>>() {
-            public Map<?, ?> doInPreparedStatement(PreparedStatement ps) throws SQLException {
+    public int commit(final SqlEndpoint endpoint, final Exchange exchange, final Object data, final JdbcTemplate jdbcTemplate, final String query) throws Exception {
+
+        final String preparedQuery = endpoint.getPrepareStatementStrategy().prepareQuery(query, endpoint.isAllowNamedParameters());
+
+        return jdbcTemplate.execute(preparedQuery, new PreparedStatementCallback<Integer>() {
+            public Integer doInPreparedStatement(PreparedStatement ps) throws SQLException {
                 int expected = ps.getParameterMetaData().getParameterCount();
 
-                Iterator<?> iterator = createIterator(exchange, query, expected);
+                Iterator<?> iterator = endpoint.getPrepareStatementStrategy().createPopulateIterator(query, preparedQuery, expected, exchange, data);
                 if (iterator != null) {
-                    populateStatement(ps, iterator, expected);
+                    endpoint.getPrepareStatementStrategy().populateStatement(ps, iterator, expected);
                     LOG.trace("Execute query {}", query);
                     ps.execute();
-                }
-
-                return null;
-            };
-        });
-    }
 
-    private Iterator<?> createIterator(Exchange exchange, final String query, final int expectedParams) {
-        Object body = exchange.getIn().getBody();
-        if (body == null) {
-            return null;
-        }
-
-        // TODO: support named parameters
-/*
-        if (body instanceof Map) {
-            final Map map = (Map) body;
-            return new Iterator() {
-
-                private int current;
-
-                @Override
-                public boolean hasNext() {
-                    return current < expectedParams;
+                    int updateCount = ps.getUpdateCount();
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Update count {}", updateCount);
+                    }
+                    return updateCount;
                 }
 
-                @Override
-                public Object next() {
-                    current++;
-                    // TODO: Fix me
-                    return map.get("ID");
-                }
-
-                @Override
-                public void remove() {
-                    // noop
-                }
+                return 0;
             };
-        }*/
-
-        // else force as iterator based
-        Iterator<?> iterator = exchange.getIn().getBody(Iterator.class);
-        return iterator;
-    }
-
-    private void populateStatement(PreparedStatement ps, Iterator<?> iterator, int expectedParams) throws SQLException {
-        int argNumber = 1;
-        if (expectedParams > 0) {
-            while (iterator != null && iterator.hasNext()) {
-                Object value = iterator.next();
-                LOG.trace("Setting parameter #{} with value: {}", argNumber, value);
-                ps.setObject(argNumber, value);
-                argNumber++;
-            }
-        }
-
-        if (argNumber - 1 != expectedParams) {
-            throw new SQLException("Number of parameters mismatch. Expected: " + expectedParams + ", was:" + (argNumber - 1));
-        }
+        });
     }
 
 }

Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java?rev=1434662&r1=1434661&r2=1434662&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java (original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java Thu Jan 17 13:22:05 2013
@@ -51,7 +51,18 @@ public class SqlComponent extends Defaul
         IntrospectionSupport.setProperties(jdbcTemplate, parameters, "template.");
 
         String query = remaining.replaceAll(parameterPlaceholderSubstitute, "?");
-        return new SqlEndpoint(uri, this, jdbcTemplate, query);
+
+        String onConsume = getAndRemoveParameter(parameters, "consumer.onConsume", String.class);
+        if (onConsume == null) {
+            onConsume = getAndRemoveParameter(parameters, "onConsume", String.class);
+        }
+        if (onConsume != null) {
+            onConsume = onConsume.replaceAll(parameterPlaceholderSubstitute, "?");
+        }
+
+        SqlEndpoint endpoint = new SqlEndpoint(uri, this, jdbcTemplate, query);
+        endpoint.setOnConsume(onConsume);
+        return endpoint;
     }
 
     public void setDataSource(DataSource dataSource) {

Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java?rev=1434662&r1=1434661&r2=1434662&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java (original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java Thu Jan 17 13:22:05 2013
@@ -45,20 +45,11 @@ public class SqlConsumer extends Schedul
     private final String query;
     private final JdbcTemplate jdbcTemplate;
 
-    /**
-     * Statement to run after data has been processed in the route
-     */
     private String onConsume;
-
-    /**
-     * Process resultset individually or as a list
-     */
     private boolean useIterator = true;
-
-    /**
-     * Whether allow empty resultset to be routed to the next hop
-     */
     private boolean routeEmptyResultSet;
+    private int expectedUpdateCount = -1;
+    private boolean breakBatchOnConsumeFail;
 
     private static final class DataHolder {
         private Exchange exchange;
@@ -92,9 +83,10 @@ public class SqlConsumer extends Schedul
             public Integer doInPreparedStatement(PreparedStatement preparedStatement) throws SQLException, DataAccessException {
                 Queue<DataHolder> answer = new LinkedList<DataHolder>();
 
+                log.debug("Executing query: {}", preparedQuery);
                 ResultSet rs = preparedStatement.executeQuery();
                 try {
-                    log.trace("Got result list from query {}", rs);
+                    log.trace("Got result list from query: {}", rs);
 
                     RowMapperResultSetExtractor<Map<String, Object>> mapper = new RowMapperResultSetExtractor<Map<String, Object>>(new ColumnMapRowMapper());
                     List<Map<String, Object>> data = mapper.extractData(rs);
@@ -166,19 +158,24 @@ public class SqlConsumer extends Schedul
             pendingExchanges = total - index - 1;
 
             // process the current exchange
-            log.debug("Processing exchange: {} with properties: {}", exchange, exchange.getProperties());
             getProcessor().process(exchange);
 
-            // TODO: support when with CAMEL-5977
-            /*
             try {
-                if (onConsume != null) {
-                    SqlEndpoint endpoint = (SqlEndpoint) getEndpoint();
-                    endpoint.getProcessingStrategy().commit(endpoint, exchange, data, jdbcTemplate, onConsume);
+                // we can only run on consume if there was data
+                if (onConsume != null && data != null) {
+                    int updateCount = getEndpoint().getProcessingStrategy().commit(getEndpoint(), exchange, data, jdbcTemplate, onConsume);
+                    if (expectedUpdateCount > -1 && updateCount != expectedUpdateCount) {
+                        String msg = "Expected update count " + expectedUpdateCount + " but was " + updateCount + " executing query: " + onConsume;
+                        throw new SQLException(msg);
+                    }
                 }
             } catch (Exception e) {
-                handleException(e);
-            }*/
+                if (breakBatchOnConsumeFail) {
+                    throw e;
+                } else {
+                    handleException("Error executing onConsume query " + onConsume, e);
+                }
+            }
         }
 
         return total;
@@ -231,5 +228,28 @@ public class SqlConsumer extends Schedul
         this.routeEmptyResultSet = routeEmptyResultSet;
     }
 
+    public int getExpectedUpdateCount() {
+        return expectedUpdateCount;
+    }
+
+    /**
+     * Sets an expected update count to validate when using onConsume.
+     *
+     * @param expectedUpdateCount typically set this value to <tt>1</tt> to expect 1 row updated.
+     */
+    public void setExpectedUpdateCount(int expectedUpdateCount) {
+        this.expectedUpdateCount = expectedUpdateCount;
+    }
+
+    public boolean isBreakBatchOnConsumeFail() {
+        return breakBatchOnConsumeFail;
+    }
+
+    /**
+     * Sets whether to break batch if onConsume failed.
+     */
+    public void setBreakBatchOnConsumeFail(boolean breakBatchOnConsumeFail) {
+        this.breakBatchOnConsumeFail = breakBatchOnConsumeFail;
+    }
 }
 

Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java?rev=1434662&r1=1434661&r2=1434662&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java (original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java Thu Jan 17 13:22:05 2013
@@ -32,7 +32,8 @@ public interface SqlProcessingStrategy {
      * @param data         The original data delivered to the route
      * @param jdbcTemplate The JDBC template
      * @param query        The SQL query to execute
+     * @return the update count if the query returned an update count
      * @throws Exception can be thrown in case of error
      */
-    void commit(SqlEndpoint endpoint, Exchange exchange, Object data, JdbcTemplate jdbcTemplate, String query) throws Exception;
+    int commit(SqlEndpoint endpoint, Exchange exchange, Object data, JdbcTemplate jdbcTemplate, String query) throws Exception;
 }

Modified: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java?rev=1434662&r1=1434661&r2=1434662&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java (original)
+++ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java Thu Jan 17 13:22:05 2013
@@ -20,13 +20,14 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.builder.NotifyBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
+import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
 import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
 import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
@@ -34,16 +35,18 @@ import org.springframework.jdbc.datasour
 /**
  *
  */
-@Ignore
 public class SqlConsumerDeleteTest extends CamelTestSupport {
 
     private EmbeddedDatabase db;
+    private JdbcTemplate jdbcTemplate;
 
     @Before
     public void setUp() throws Exception {
         db = new EmbeddedDatabaseBuilder()
             .setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
 
+        jdbcTemplate = new JdbcTemplate(db);
+
         super.setUp();
     }
 
@@ -70,6 +73,12 @@ public class SqlConsumerDeleteTest exten
         assertEquals("AMQ", exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
         assertEquals(3, exchanges.get(2).getIn().getBody(Map.class).get("ID"));
         assertEquals("Linux", exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));
+
+        // give it a little tine to delete
+        Thread.sleep(500);
+
+        // there should only be 1 row in the table
+        assertEquals("Should have deleted all 3 rows", 0, jdbcTemplate.queryForInt("select count(*) from projects"));
     }
 
     @Override
@@ -79,7 +88,7 @@ public class SqlConsumerDeleteTest exten
             public void configure() throws Exception {
                 getContext().getComponent("sql", SqlComponent.class).setDataSource(db);
 
-                from("sql:select * from projects order by id?consumer.onConsume=delete from projects where id = #")
+                from("sql:select * from projects order by id?consumer.onConsume=delete from projects where id = :#id")
                     .to("mock:result");
             }
         };

Modified: camel/trunk/components/camel-sql/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/resources/log4j.properties?rev=1434662&r1=1434661&r2=1434662&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-sql/src/test/resources/log4j.properties Thu Jan 17 13:22:05 2013
@@ -16,7 +16,7 @@
 ## ------------------------------------------------------------------------
 
 #
-# The logging properties used for eclipse testing, We want to see debug output on the console.
+# The logging properties used for testing
 #
 log4j.rootLogger=INFO, file