You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/01/18 13:48:25 UTC

svn commit: r1435118 - in /camel/trunk/components/camel-sql/src: main/java/org/apache/camel/component/sql/ test/java/org/apache/camel/component/sql/

Author: davsclaus
Date: Fri Jan 18 12:48:24 2013
New Revision: 1435118

URL: http://svn.apache.org/viewvc?rev=1435118&view=rev
Log:
CAMEL-5976: camel-sql consumer can now do onConsume to delete row after processing etc.

Added:
    camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteFailedTest.java
      - copied, changed from r1435070, camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
Modified:
    camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
    camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
    camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
    camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteBatchCompleteTest.java
    camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
    camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTransformTest.java

Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java?rev=1435118&r1=1435117&r2=1435118&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java (original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java Fri Jan 18 12:48:24 2013
@@ -31,6 +31,7 @@ import org.springframework.jdbc.core.Jdb
  */
 public class SqlComponent extends DefaultComponent {
     private DataSource dataSource;
+    private boolean usePlaceholder = true;
 
     public SqlComponent() {
     }
@@ -56,19 +57,27 @@ public class SqlComponent extends Defaul
         if (onConsume == null) {
             onConsume = getAndRemoveParameter(parameters, "onConsume", String.class);
         }
-        if (onConsume != null) {
+        if (onConsume != null && usePlaceholder) {
             onConsume = onConsume.replaceAll(parameterPlaceholderSubstitute, "?");
         }
+        String onConsumeFailed = getAndRemoveParameter(parameters, "consumer.onConsumeFailed", String.class);
+        if (onConsumeFailed == null) {
+            onConsumeFailed = getAndRemoveParameter(parameters, "onConsumeFailed", String.class);
+        }
+        if (onConsumeFailed != null && usePlaceholder) {
+            onConsumeFailed = onConsumeFailed.replaceAll(parameterPlaceholderSubstitute, "?");
+        }
         String onConsumeBatchComplete = getAndRemoveParameter(parameters, "consumer.onConsumeBatchComplete", String.class);
         if (onConsumeBatchComplete == null) {
             onConsumeBatchComplete = getAndRemoveParameter(parameters, "onConsumeBatchComplete", String.class);
         }
-        if (onConsumeBatchComplete != null) {
+        if (onConsumeBatchComplete != null && usePlaceholder) {
             onConsumeBatchComplete = onConsumeBatchComplete.replaceAll(parameterPlaceholderSubstitute, "?");
         }
 
         SqlEndpoint endpoint = new SqlEndpoint(uri, this, jdbcTemplate, query);
         endpoint.setOnConsume(onConsume);
+        endpoint.setOnConsumeFailed(onConsumeFailed);
         endpoint.setOnConsumeBatchComplete(onConsumeBatchComplete);
         return endpoint;
     }
@@ -80,4 +89,17 @@ public class SqlComponent extends Defaul
     public DataSource getDataSource() {
         return dataSource;
     }
+
+    public boolean isUsePlaceholder() {
+        return usePlaceholder;
+    }
+
+    /**
+     * Sets whether to use placeholder and replace all placeholder characters with ? sign in the SQL queries.
+     * <p/>
+     * This option is default <tt>true</tt>
+     */
+    public void setUsePlaceholder(boolean usePlaceholder) {
+        this.usePlaceholder = usePlaceholder;
+    }
 }

Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java?rev=1435118&r1=1435117&r2=1435118&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java (original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java Fri Jan 18 12:48:24 2013
@@ -46,6 +46,7 @@ public class SqlConsumer extends Schedul
     private final JdbcTemplate jdbcTemplate;
 
     private String onConsume;
+    private String onConsumeFailed;
     private String onConsumeBatchComplete;
     private boolean useIterator = true;
     private boolean routeEmptyResultSet;
@@ -159,14 +160,20 @@ public class SqlConsumer extends Schedul
             pendingExchanges = total - index - 1;
 
             // process the current exchange
-            getProcessor().process(exchange);
+            try {
+                getProcessor().process(exchange);
+            } catch (Exception e) {
+                exchange.setException(e);
+            }
 
+            // pick the on consume to use
+            String sql = exchange.isFailed() ? onConsumeFailed : onConsume;
             try {
                 // we can only run on consume if there was data
-                if (onConsume != null && data != null) {
-                    int updateCount = getEndpoint().getProcessingStrategy().commit(getEndpoint(), exchange, data, jdbcTemplate, onConsume);
+                if (data != null && sql != null) {
+                    int updateCount = getEndpoint().getProcessingStrategy().commit(getEndpoint(), exchange, data, jdbcTemplate, sql);
                     if (expectedUpdateCount > -1 && updateCount != expectedUpdateCount) {
-                        String msg = "Expected update count " + expectedUpdateCount + " but was " + updateCount + " executing query: " + onConsume;
+                        String msg = "Expected update count " + expectedUpdateCount + " but was " + updateCount + " executing query: " + sql;
                         throw new SQLException(msg);
                     }
                 }
@@ -174,7 +181,7 @@ public class SqlConsumer extends Schedul
                 if (breakBatchOnConsumeFail) {
                     throw e;
                 } else {
-                    handleException("Error executing onConsume query " + onConsume, e);
+                    handleException("Error executing onConsume/onConsumeFailed query " + sql, e);
                 }
             }
         }
@@ -195,22 +202,28 @@ public class SqlConsumer extends Schedul
         return total;
     }
 
-    /**
-     * Gets the statement(s) to run after successful processing.
-     * Use comma to separate multiple statements.
-     */
     public String getOnConsume() {
         return onConsume;
     }
 
     /**
-     * Sets the statement to run after successful processing.
-     * Use comma to separate multiple statements.
+     * Sets a SQL to execute when the row has been successfully processed.
      */
     public void setOnConsume(String onConsume) {
         this.onConsume = onConsume;
     }
 
+    public String getOnConsumeFailed() {
+        return onConsumeFailed;
+    }
+
+    /**
+     * Sets a SQL to execute when the row failed being processed.
+     */
+    public void setOnConsumeFailed(String onConsumeFailed) {
+        this.onConsumeFailed = onConsumeFailed;
+    }
+
     public String getOnConsumeBatchComplete() {
         return onConsumeBatchComplete;
     }

Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java?rev=1435118&r1=1435117&r2=1435118&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java (original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java Fri Jan 18 12:48:24 2013
@@ -37,6 +37,7 @@ public class SqlEndpoint extends Default
     private SqlProcessingStrategy processingStrategy = new DefaultSqlProcessingStrategy();
     private SqlPrepareStatementStrategy prepareStatementStrategy = new DefaultSqlPrepareStatementStrategy();
     private String onConsume;
+    private String onConsumeFailed;
     private String onConsumeBatchComplete;
     private boolean allowNamedParameters = true;
 
@@ -53,6 +54,7 @@ public class SqlEndpoint extends Default
         SqlConsumer consumer = new SqlConsumer(this, processor, jdbcTemplate, query);
         consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
         consumer.setOnConsume(getOnConsume());
+        consumer.setOnConsumeFailed(getOnConsumeFailed());
         consumer.setOnConsumeBatchComplete(getOnConsumeBatchComplete());
         configureConsumer(consumer);
         return consumer;
@@ -122,6 +124,14 @@ public class SqlEndpoint extends Default
         this.onConsume = onConsume;
     }
 
+    public String getOnConsumeFailed() {
+        return onConsumeFailed;
+    }
+
+    public void setOnConsumeFailed(String onConsumeFailed) {
+        this.onConsumeFailed = onConsumeFailed;
+    }
+
     public String getOnConsumeBatchComplete() {
         return onConsumeBatchComplete;
     }

Modified: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteBatchCompleteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteBatchCompleteTest.java?rev=1435118&r1=1435117&r2=1435118&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteBatchCompleteTest.java (original)
+++ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteBatchCompleteTest.java Fri Jan 18 12:48:24 2013
@@ -60,7 +60,7 @@ public class SqlConsumerDeleteBatchCompl
         assertMockEndpointsSatisfied();
 
         // give it a little tine to delete
-        Thread.sleep(500);
+        Thread.sleep(1000);
 
         assertEquals("Should have deleted all 3 rows", 0, jdbcTemplate.queryForInt("select count(*) from projects"));
     }

Copied: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteFailedTest.java (from r1435070, camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteFailedTest.java?p2=camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteFailedTest.java&p1=camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java&r1=1435070&r2=1435118&rev=1435118&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java (original)
+++ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteFailedTest.java Fri Jan 18 12:48:24 2013
@@ -20,7 +20,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit4.CamelTestSupport;
@@ -35,7 +35,7 @@ import org.springframework.jdbc.datasour
 /**
  *
  */
-public class SqlConsumerDeleteTest extends CamelTestSupport {
+public class SqlConsumerDeleteFailedTest extends CamelTestSupport {
 
     private EmbeddedDatabase db;
     private JdbcTemplate jdbcTemplate;
@@ -60,24 +60,23 @@ public class SqlConsumerDeleteTest exten
     @Test
     public void testConsume() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedMessageCount(3);
+        mock.expectedMessageCount(2);
 
         assertMockEndpointsSatisfied();
 
         List<Exchange> exchanges = mock.getReceivedExchanges();
-        assertEquals(3, exchanges.size());
+        assertEquals(2, exchanges.size());
 
         assertEquals(1, exchanges.get(0).getIn().getBody(Map.class).get("ID"));
         assertEquals("Camel", exchanges.get(0).getIn().getBody(Map.class).get("PROJECT"));
-        assertEquals(2, exchanges.get(1).getIn().getBody(Map.class).get("ID"));
-        assertEquals("AMQ", exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
-        assertEquals(3, exchanges.get(2).getIn().getBody(Map.class).get("ID"));
-        assertEquals("Linux", exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));
+        assertEquals(3, exchanges.get(1).getIn().getBody(Map.class).get("ID"));
+        assertEquals("Linux", exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
 
         // give it a little tine to delete
-        Thread.sleep(500);
+        Thread.sleep(1000);
 
-        assertEquals("Should have deleted all 3 rows", 0, jdbcTemplate.queryForInt("select count(*) from projects"));
+        assertEquals("Should have deleted 2 rows", 1, jdbcTemplate.queryForInt("select count(*) from projects"));
+        assertEquals("Should be AMQ project that is BAD", "AMQ", jdbcTemplate.queryForObject("select PROJECT from projects where license = 'BAD'", String.class));
     }
 
     @Override
@@ -87,7 +86,18 @@ public class SqlConsumerDeleteTest exten
             public void configure() throws Exception {
                 getContext().getComponent("sql", SqlComponent.class).setDataSource(db);
 
-                from("sql:select * from projects order by id?consumer.onConsume=delete from projects where id = :#id")
+                from("sql:select * from projects where license <> 'BAD' order by id"
+                        + "?consumer.onConsume=delete from projects where id = :#id"
+                        + "&consumer.onConsumeFailed=update projects set license = 'BAD' where id = :#id")
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            Object project = exchange.getIn().getBody(Map.class).get("PROJECT");
+                            if ("AMQ".equals(project)) {
+                                throw new IllegalArgumentException("Cannot handled AMQ");
+                            }
+                        }
+                    })
                     .to("mock:result");
             }
         };

Modified: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java?rev=1435118&r1=1435117&r2=1435118&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java (original)
+++ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java Fri Jan 18 12:48:24 2013
@@ -75,7 +75,7 @@ public class SqlConsumerDeleteTest exten
         assertEquals("Linux", exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));
 
         // give it a little tine to delete
-        Thread.sleep(500);
+        Thread.sleep(1000);
 
         assertEquals("Should have deleted all 3 rows", 0, jdbcTemplate.queryForInt("select count(*) from projects"));
     }

Modified: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTransformTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTransformTest.java?rev=1435118&r1=1435117&r2=1435118&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTransformTest.java (original)
+++ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTransformTest.java Fri Jan 18 12:48:24 2013
@@ -60,7 +60,7 @@ public class SqlConsumerDeleteTransformT
         assertMockEndpointsSatisfied();
 
         // give it a little tine to delete
-        Thread.sleep(500);
+        Thread.sleep(1000);
 
         assertEquals("Should have deleted all 3 rows", 0, jdbcTemplate.queryForInt("select count(*) from projects"));
     }