You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/01/18 13:48:25 UTC
svn commit: r1435118 - in /camel/trunk/components/camel-sql/src:
main/java/org/apache/camel/component/sql/
test/java/org/apache/camel/component/sql/
Author: davsclaus
Date: Fri Jan 18 12:48:24 2013
New Revision: 1435118
URL: http://svn.apache.org/viewvc?rev=1435118&view=rev
Log:
CAMEL-5976: camel-sql consumer can now do onConsume to delete row after processing etc.
Added:
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteFailedTest.java
- copied, changed from r1435070, camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
Modified:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteBatchCompleteTest.java
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTransformTest.java
Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java?rev=1435118&r1=1435117&r2=1435118&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java (original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java Fri Jan 18 12:48:24 2013
@@ -31,6 +31,7 @@ import org.springframework.jdbc.core.Jdb
*/
public class SqlComponent extends DefaultComponent {
private DataSource dataSource;
+ private boolean usePlaceholder = true;
public SqlComponent() {
}
@@ -56,19 +57,27 @@ public class SqlComponent extends Defaul
if (onConsume == null) {
onConsume = getAndRemoveParameter(parameters, "onConsume", String.class);
}
- if (onConsume != null) {
+ if (onConsume != null && usePlaceholder) {
onConsume = onConsume.replaceAll(parameterPlaceholderSubstitute, "?");
}
+ String onConsumeFailed = getAndRemoveParameter(parameters, "consumer.onConsumeFailed", String.class);
+ if (onConsumeFailed == null) {
+ onConsumeFailed = getAndRemoveParameter(parameters, "onConsumeFailed", String.class);
+ }
+ if (onConsumeFailed != null && usePlaceholder) {
+ onConsumeFailed = onConsumeFailed.replaceAll(parameterPlaceholderSubstitute, "?");
+ }
String onConsumeBatchComplete = getAndRemoveParameter(parameters, "consumer.onConsumeBatchComplete", String.class);
if (onConsumeBatchComplete == null) {
onConsumeBatchComplete = getAndRemoveParameter(parameters, "onConsumeBatchComplete", String.class);
}
- if (onConsumeBatchComplete != null) {
+ if (onConsumeBatchComplete != null && usePlaceholder) {
onConsumeBatchComplete = onConsumeBatchComplete.replaceAll(parameterPlaceholderSubstitute, "?");
}
SqlEndpoint endpoint = new SqlEndpoint(uri, this, jdbcTemplate, query);
endpoint.setOnConsume(onConsume);
+ endpoint.setOnConsumeFailed(onConsumeFailed);
endpoint.setOnConsumeBatchComplete(onConsumeBatchComplete);
return endpoint;
}
@@ -80,4 +89,17 @@ public class SqlComponent extends Defaul
public DataSource getDataSource() {
return dataSource;
}
+
+ public boolean isUsePlaceholder() {
+ return usePlaceholder;
+ }
+
+ /**
+ * Sets whether to use placeholder and replace all placeholder characters with ? sign in the SQL queries.
+ * <p/>
+ * This option is default <tt>true</tt>
+ */
+ public void setUsePlaceholder(boolean usePlaceholder) {
+ this.usePlaceholder = usePlaceholder;
+ }
}
Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java?rev=1435118&r1=1435117&r2=1435118&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java (original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java Fri Jan 18 12:48:24 2013
@@ -46,6 +46,7 @@ public class SqlConsumer extends Schedul
private final JdbcTemplate jdbcTemplate;
private String onConsume;
+ private String onConsumeFailed;
private String onConsumeBatchComplete;
private boolean useIterator = true;
private boolean routeEmptyResultSet;
@@ -159,14 +160,20 @@ public class SqlConsumer extends Schedul
pendingExchanges = total - index - 1;
// process the current exchange
- getProcessor().process(exchange);
+ try {
+ getProcessor().process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+ // pick the on consume to use
+ String sql = exchange.isFailed() ? onConsumeFailed : onConsume;
try {
// we can only run on consume if there was data
- if (onConsume != null && data != null) {
- int updateCount = getEndpoint().getProcessingStrategy().commit(getEndpoint(), exchange, data, jdbcTemplate, onConsume);
+ if (data != null && sql != null) {
+ int updateCount = getEndpoint().getProcessingStrategy().commit(getEndpoint(), exchange, data, jdbcTemplate, sql);
if (expectedUpdateCount > -1 && updateCount != expectedUpdateCount) {
- String msg = "Expected update count " + expectedUpdateCount + " but was " + updateCount + " executing query: " + onConsume;
+ String msg = "Expected update count " + expectedUpdateCount + " but was " + updateCount + " executing query: " + sql;
throw new SQLException(msg);
}
}
@@ -174,7 +181,7 @@ public class SqlConsumer extends Schedul
if (breakBatchOnConsumeFail) {
throw e;
} else {
- handleException("Error executing onConsume query " + onConsume, e);
+ handleException("Error executing onConsume/onConsumeFailed query " + sql, e);
}
}
}
@@ -195,22 +202,28 @@ public class SqlConsumer extends Schedul
return total;
}
- /**
- * Gets the statement(s) to run after successful processing.
- * Use comma to separate multiple statements.
- */
public String getOnConsume() {
return onConsume;
}
/**
- * Sets the statement to run after successful processing.
- * Use comma to separate multiple statements.
+ * Sets a SQL to execute when the row has been successfully processed.
*/
public void setOnConsume(String onConsume) {
this.onConsume = onConsume;
}
+ public String getOnConsumeFailed() {
+ return onConsumeFailed;
+ }
+
+ /**
+ * Sets a SQL to execute when the row failed being processed.
+ */
+ public void setOnConsumeFailed(String onConsumeFailed) {
+ this.onConsumeFailed = onConsumeFailed;
+ }
+
public String getOnConsumeBatchComplete() {
return onConsumeBatchComplete;
}
Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java?rev=1435118&r1=1435117&r2=1435118&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java (original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java Fri Jan 18 12:48:24 2013
@@ -37,6 +37,7 @@ public class SqlEndpoint extends Default
private SqlProcessingStrategy processingStrategy = new DefaultSqlProcessingStrategy();
private SqlPrepareStatementStrategy prepareStatementStrategy = new DefaultSqlPrepareStatementStrategy();
private String onConsume;
+ private String onConsumeFailed;
private String onConsumeBatchComplete;
private boolean allowNamedParameters = true;
@@ -53,6 +54,7 @@ public class SqlEndpoint extends Default
SqlConsumer consumer = new SqlConsumer(this, processor, jdbcTemplate, query);
consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
consumer.setOnConsume(getOnConsume());
+ consumer.setOnConsumeFailed(getOnConsumeFailed());
consumer.setOnConsumeBatchComplete(getOnConsumeBatchComplete());
configureConsumer(consumer);
return consumer;
@@ -122,6 +124,14 @@ public class SqlEndpoint extends Default
this.onConsume = onConsume;
}
+ public String getOnConsumeFailed() {
+ return onConsumeFailed;
+ }
+
+ public void setOnConsumeFailed(String onConsumeFailed) {
+ this.onConsumeFailed = onConsumeFailed;
+ }
+
public String getOnConsumeBatchComplete() {
return onConsumeBatchComplete;
}
Modified: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteBatchCompleteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteBatchCompleteTest.java?rev=1435118&r1=1435117&r2=1435118&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteBatchCompleteTest.java (original)
+++ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteBatchCompleteTest.java Fri Jan 18 12:48:24 2013
@@ -60,7 +60,7 @@ public class SqlConsumerDeleteBatchCompl
assertMockEndpointsSatisfied();
// give it a little tine to delete
- Thread.sleep(500);
+ Thread.sleep(1000);
assertEquals("Should have deleted all 3 rows", 0, jdbcTemplate.queryForInt("select count(*) from projects"));
}
Copied: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteFailedTest.java (from r1435070, camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteFailedTest.java?p2=camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteFailedTest.java&p1=camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java&r1=1435070&r2=1435118&rev=1435118&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java (original)
+++ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteFailedTest.java Fri Jan 18 12:48:24 2013
@@ -20,7 +20,7 @@ import java.util.List;
import java.util.Map;
import org.apache.camel.Exchange;
-import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
@@ -35,7 +35,7 @@ import org.springframework.jdbc.datasour
/**
*
*/
-public class SqlConsumerDeleteTest extends CamelTestSupport {
+public class SqlConsumerDeleteFailedTest extends CamelTestSupport {
private EmbeddedDatabase db;
private JdbcTemplate jdbcTemplate;
@@ -60,24 +60,23 @@ public class SqlConsumerDeleteTest exten
@Test
public void testConsume() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedMessageCount(3);
+ mock.expectedMessageCount(2);
assertMockEndpointsSatisfied();
List<Exchange> exchanges = mock.getReceivedExchanges();
- assertEquals(3, exchanges.size());
+ assertEquals(2, exchanges.size());
assertEquals(1, exchanges.get(0).getIn().getBody(Map.class).get("ID"));
assertEquals("Camel", exchanges.get(0).getIn().getBody(Map.class).get("PROJECT"));
- assertEquals(2, exchanges.get(1).getIn().getBody(Map.class).get("ID"));
- assertEquals("AMQ", exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
- assertEquals(3, exchanges.get(2).getIn().getBody(Map.class).get("ID"));
- assertEquals("Linux", exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));
+ assertEquals(3, exchanges.get(1).getIn().getBody(Map.class).get("ID"));
+ assertEquals("Linux", exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
// give it a little tine to delete
- Thread.sleep(500);
+ Thread.sleep(1000);
- assertEquals("Should have deleted all 3 rows", 0, jdbcTemplate.queryForInt("select count(*) from projects"));
+ assertEquals("Should have deleted 2 rows", 1, jdbcTemplate.queryForInt("select count(*) from projects"));
+ assertEquals("Should be AMQ project that is BAD", "AMQ", jdbcTemplate.queryForObject("select PROJECT from projects where license = 'BAD'", String.class));
}
@Override
@@ -87,7 +86,18 @@ public class SqlConsumerDeleteTest exten
public void configure() throws Exception {
getContext().getComponent("sql", SqlComponent.class).setDataSource(db);
- from("sql:select * from projects order by id?consumer.onConsume=delete from projects where id = :#id")
+ from("sql:select * from projects where license <> 'BAD' order by id"
+ + "?consumer.onConsume=delete from projects where id = :#id"
+ + "&consumer.onConsumeFailed=update projects set license = 'BAD' where id = :#id")
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ Object project = exchange.getIn().getBody(Map.class).get("PROJECT");
+ if ("AMQ".equals(project)) {
+ throw new IllegalArgumentException("Cannot handled AMQ");
+ }
+ }
+ })
.to("mock:result");
}
};
Modified: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java?rev=1435118&r1=1435117&r2=1435118&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java (original)
+++ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java Fri Jan 18 12:48:24 2013
@@ -75,7 +75,7 @@ public class SqlConsumerDeleteTest exten
assertEquals("Linux", exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));
// give it a little tine to delete
- Thread.sleep(500);
+ Thread.sleep(1000);
assertEquals("Should have deleted all 3 rows", 0, jdbcTemplate.queryForInt("select count(*) from projects"));
}
Modified: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTransformTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTransformTest.java?rev=1435118&r1=1435117&r2=1435118&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTransformTest.java (original)
+++ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTransformTest.java Fri Jan 18 12:48:24 2013
@@ -60,7 +60,7 @@ public class SqlConsumerDeleteTransformT
assertMockEndpointsSatisfied();
// give it a little tine to delete
- Thread.sleep(500);
+ Thread.sleep(1000);
assertEquals("Should have deleted all 3 rows", 0, jdbcTemplate.queryForInt("select count(*) from projects"));
}