You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/01/17 14:22:05 UTC
svn commit: r1434662 - in /camel/trunk/components/camel-sql/src:
main/java/org/apache/camel/component/sql/
test/java/org/apache/camel/component/sql/ test/resources/
Author: davsclaus
Date: Thu Jan 17 13:22:05 2013
New Revision: 1434662
URL: http://svn.apache.org/viewvc?rev=1434662&view=rev
Log:
CAMEL-5976: camel-sql consumer can now do onConsume to delete row after processing etc.
Modified:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
camel/trunk/components/camel-sql/src/test/resources/log4j.properties
Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java?rev=1434662&r1=1434661&r2=1434662&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java (original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/DefaultSqlProcessingStrategy.java Thu Jan 17 13:22:05 2013
@@ -19,7 +19,6 @@ package org.apache.camel.component.sql;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Iterator;
-import java.util.Map;
import org.apache.camel.Exchange;
import org.slf4j.Logger;
@@ -35,75 +34,30 @@ public class DefaultSqlProcessingStrateg
private static final Logger LOG = LoggerFactory.getLogger(DefaultSqlProcessingStrategy.class);
@Override
- public void commit(SqlEndpoint endpoint, final Exchange exchange, Object data, JdbcTemplate jdbcTemplate, final String query) throws Exception {
- jdbcTemplate.execute(query, new PreparedStatementCallback<Map<?, ?>>() {
- public Map<?, ?> doInPreparedStatement(PreparedStatement ps) throws SQLException {
+ public int commit(final SqlEndpoint endpoint, final Exchange exchange, final Object data, final JdbcTemplate jdbcTemplate, final String query) throws Exception {
+
+ final String preparedQuery = endpoint.getPrepareStatementStrategy().prepareQuery(query, endpoint.isAllowNamedParameters());
+
+ return jdbcTemplate.execute(preparedQuery, new PreparedStatementCallback<Integer>() {
+ public Integer doInPreparedStatement(PreparedStatement ps) throws SQLException {
int expected = ps.getParameterMetaData().getParameterCount();
- Iterator<?> iterator = createIterator(exchange, query, expected);
+ Iterator<?> iterator = endpoint.getPrepareStatementStrategy().createPopulateIterator(query, preparedQuery, expected, exchange, data);
if (iterator != null) {
- populateStatement(ps, iterator, expected);
+ endpoint.getPrepareStatementStrategy().populateStatement(ps, iterator, expected);
LOG.trace("Execute query {}", query);
ps.execute();
- }
-
- return null;
- };
- });
- }
- private Iterator<?> createIterator(Exchange exchange, final String query, final int expectedParams) {
- Object body = exchange.getIn().getBody();
- if (body == null) {
- return null;
- }
-
- // TODO: support named parameters
-/*
- if (body instanceof Map) {
- final Map map = (Map) body;
- return new Iterator() {
-
- private int current;
-
- @Override
- public boolean hasNext() {
- return current < expectedParams;
+ int updateCount = ps.getUpdateCount();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Update count {}", updateCount);
+ }
+ return updateCount;
}
- @Override
- public Object next() {
- current++;
- // TODO: Fix me
- return map.get("ID");
- }
-
- @Override
- public void remove() {
- // noop
- }
+ return 0;
};
- }*/
-
- // else force as iterator based
- Iterator<?> iterator = exchange.getIn().getBody(Iterator.class);
- return iterator;
- }
-
- private void populateStatement(PreparedStatement ps, Iterator<?> iterator, int expectedParams) throws SQLException {
- int argNumber = 1;
- if (expectedParams > 0) {
- while (iterator != null && iterator.hasNext()) {
- Object value = iterator.next();
- LOG.trace("Setting parameter #{} with value: {}", argNumber, value);
- ps.setObject(argNumber, value);
- argNumber++;
- }
- }
-
- if (argNumber - 1 != expectedParams) {
- throw new SQLException("Number of parameters mismatch. Expected: " + expectedParams + ", was:" + (argNumber - 1));
- }
+ });
}
}
Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java?rev=1434662&r1=1434661&r2=1434662&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java (original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java Thu Jan 17 13:22:05 2013
@@ -51,7 +51,18 @@ public class SqlComponent extends Defaul
IntrospectionSupport.setProperties(jdbcTemplate, parameters, "template.");
String query = remaining.replaceAll(parameterPlaceholderSubstitute, "?");
- return new SqlEndpoint(uri, this, jdbcTemplate, query);
+
+ String onConsume = getAndRemoveParameter(parameters, "consumer.onConsume", String.class);
+ if (onConsume == null) {
+ onConsume = getAndRemoveParameter(parameters, "onConsume", String.class);
+ }
+ if (onConsume != null) {
+ onConsume = onConsume.replaceAll(parameterPlaceholderSubstitute, "?");
+ }
+
+ SqlEndpoint endpoint = new SqlEndpoint(uri, this, jdbcTemplate, query);
+ endpoint.setOnConsume(onConsume);
+ return endpoint;
}
public void setDataSource(DataSource dataSource) {
Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java?rev=1434662&r1=1434661&r2=1434662&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java (original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java Thu Jan 17 13:22:05 2013
@@ -45,20 +45,11 @@ public class SqlConsumer extends Schedul
private final String query;
private final JdbcTemplate jdbcTemplate;
- /**
- * Statement to run after data has been processed in the route
- */
private String onConsume;
-
- /**
- * Process resultset individually or as a list
- */
private boolean useIterator = true;
-
- /**
- * Whether allow empty resultset to be routed to the next hop
- */
private boolean routeEmptyResultSet;
+ private int expectedUpdateCount = -1;
+ private boolean breakBatchOnConsumeFail;
private static final class DataHolder {
private Exchange exchange;
@@ -92,9 +83,10 @@ public class SqlConsumer extends Schedul
public Integer doInPreparedStatement(PreparedStatement preparedStatement) throws SQLException, DataAccessException {
Queue<DataHolder> answer = new LinkedList<DataHolder>();
+ log.debug("Executing query: {}", preparedQuery);
ResultSet rs = preparedStatement.executeQuery();
try {
- log.trace("Got result list from query {}", rs);
+ log.trace("Got result list from query: {}", rs);
RowMapperResultSetExtractor<Map<String, Object>> mapper = new RowMapperResultSetExtractor<Map<String, Object>>(new ColumnMapRowMapper());
List<Map<String, Object>> data = mapper.extractData(rs);
@@ -166,19 +158,24 @@ public class SqlConsumer extends Schedul
pendingExchanges = total - index - 1;
// process the current exchange
- log.debug("Processing exchange: {} with properties: {}", exchange, exchange.getProperties());
getProcessor().process(exchange);
- // TODO: support when with CAMEL-5977
- /*
try {
- if (onConsume != null) {
- SqlEndpoint endpoint = (SqlEndpoint) getEndpoint();
- endpoint.getProcessingStrategy().commit(endpoint, exchange, data, jdbcTemplate, onConsume);
+ // we can only run on consume if there was data
+ if (onConsume != null && data != null) {
+ int updateCount = getEndpoint().getProcessingStrategy().commit(getEndpoint(), exchange, data, jdbcTemplate, onConsume);
+ if (expectedUpdateCount > -1 && updateCount != expectedUpdateCount) {
+ String msg = "Expected update count " + expectedUpdateCount + " but was " + updateCount + " executing query: " + onConsume;
+ throw new SQLException(msg);
+ }
}
} catch (Exception e) {
- handleException(e);
- }*/
+ if (breakBatchOnConsumeFail) {
+ throw e;
+ } else {
+ handleException("Error executing onConsume query " + onConsume, e);
+ }
+ }
}
return total;
@@ -231,5 +228,28 @@ public class SqlConsumer extends Schedul
this.routeEmptyResultSet = routeEmptyResultSet;
}
+ public int getExpectedUpdateCount() {
+ return expectedUpdateCount;
+ }
+
+ /**
+ * Sets an expected update count to validate when using onConsume.
+ *
+ * @param expectedUpdateCount typically set this value to <tt>1</tt> to expect 1 row updated.
+ */
+ public void setExpectedUpdateCount(int expectedUpdateCount) {
+ this.expectedUpdateCount = expectedUpdateCount;
+ }
+
+ public boolean isBreakBatchOnConsumeFail() {
+ return breakBatchOnConsumeFail;
+ }
+
+ /**
+ * Sets whether to break batch if onConsume failed.
+ */
+ public void setBreakBatchOnConsumeFail(boolean breakBatchOnConsumeFail) {
+ this.breakBatchOnConsumeFail = breakBatchOnConsumeFail;
+ }
}
Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java?rev=1434662&r1=1434661&r2=1434662&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java (original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProcessingStrategy.java Thu Jan 17 13:22:05 2013
@@ -32,7 +32,8 @@ public interface SqlProcessingStrategy {
* @param data The original data delivered to the route
* @param jdbcTemplate The JDBC template
* @param query The SQL query to execute
+ * @return the update count if the query returned an update count
* @throws Exception can be thrown in case of error
*/
- void commit(SqlEndpoint endpoint, Exchange exchange, Object data, JdbcTemplate jdbcTemplate, String query) throws Exception;
+ int commit(SqlEndpoint endpoint, Exchange exchange, Object data, JdbcTemplate jdbcTemplate, String query) throws Exception;
}
Modified: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java?rev=1434662&r1=1434661&r2=1434662&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java (original)
+++ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerDeleteTest.java Thu Jan 17 13:22:05 2013
@@ -20,13 +20,14 @@ import java.util.List;
import java.util.Map;
import org.apache.camel.Exchange;
+import org.apache.camel.builder.NotifyBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
+import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
@@ -34,16 +35,18 @@ import org.springframework.jdbc.datasour
/**
*
*/
-@Ignore
public class SqlConsumerDeleteTest extends CamelTestSupport {
private EmbeddedDatabase db;
+ private JdbcTemplate jdbcTemplate;
@Before
public void setUp() throws Exception {
db = new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
+ jdbcTemplate = new JdbcTemplate(db);
+
super.setUp();
}
@@ -70,6 +73,12 @@ public class SqlConsumerDeleteTest exten
assertEquals("AMQ", exchanges.get(1).getIn().getBody(Map.class).get("PROJECT"));
assertEquals(3, exchanges.get(2).getIn().getBody(Map.class).get("ID"));
assertEquals("Linux", exchanges.get(2).getIn().getBody(Map.class).get("PROJECT"));
+
+ // give it a little tine to delete
+ Thread.sleep(500);
+
+ // there should only be 1 row in the table
+ assertEquals("Should have deleted all 3 rows", 0, jdbcTemplate.queryForInt("select count(*) from projects"));
}
@Override
@@ -79,7 +88,7 @@ public class SqlConsumerDeleteTest exten
public void configure() throws Exception {
getContext().getComponent("sql", SqlComponent.class).setDataSource(db);
- from("sql:select * from projects order by id?consumer.onConsume=delete from projects where id = #")
+ from("sql:select * from projects order by id?consumer.onConsume=delete from projects where id = :#id")
.to("mock:result");
}
};
Modified: camel/trunk/components/camel-sql/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/resources/log4j.properties?rev=1434662&r1=1434661&r2=1434662&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-sql/src/test/resources/log4j.properties Thu Jan 17 13:22:05 2013
@@ -16,7 +16,7 @@
## ------------------------------------------------------------------------
#
-# The logging properties used for eclipse testing, We want to see debug output on the console.
+# The logging properties used for testing
#
log4j.rootLogger=INFO, file