You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cm...@apache.org on 2011/11/29 22:14:54 UTC
svn commit: r1208070 - in /camel/trunk/components/camel-sql/src:
main/java/org/apache/camel/component/sql/SqlEndpoint.java
main/java/org/apache/camel/component/sql/SqlProducer.java
test/java/org/apache/camel/component/sql/SqlRouteTest.java
Author: cmueller
Date: Tue Nov 29 21:14:52 2011
New Revision: 1208070
URL: http://svn.apache.org/viewvc?rev=1208070&view=rev
Log:
CAMEL-4662: add batching support to sql component
Thank you Daniel Gredler for the patch
Modified:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlRouteTest.java
Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java?rev=1208070&r1=1208069&r2=1208070&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java (original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java Tue Nov 29 21:14:52 2011
@@ -32,6 +32,7 @@ import org.springframework.jdbc.core.Jdb
public class SqlEndpoint extends DefaultEndpoint {
private JdbcTemplate jdbcTemplate;
private String query;
+ private boolean batch;
public SqlEndpoint() {
}
@@ -47,7 +48,7 @@ public class SqlEndpoint extends Default
}
public Producer createProducer() throws Exception {
- return new SqlProducer(this, query, jdbcTemplate);
+ return new SqlProducer(this, query, jdbcTemplate, batch);
}
public boolean isSingleton() {
@@ -70,6 +71,14 @@ public class SqlEndpoint extends Default
this.query = query;
}
+ public boolean isBatch() {
+ return batch;
+ }
+
+ public void setBatch(boolean batch) {
+ this.batch = batch;
+ }
+
@Override
protected String createEndpointUri() {
// Make sure it's properly encoded
Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java?rev=1208070&r1=1208069&r2=1208070&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java (original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java Tue Nov 29 21:14:52 2011
@@ -24,7 +24,6 @@ import java.util.Map;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
-import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.ColumnMapRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementCallback;
@@ -33,47 +32,59 @@ import org.springframework.jdbc.core.Row
public class SqlProducer extends DefaultProducer {
private String query;
private JdbcTemplate jdbcTemplate;
+ private boolean batch;
- public SqlProducer(SqlEndpoint endpoint, String query, JdbcTemplate jdbcTemplate) {
+ public SqlProducer(SqlEndpoint endpoint, String query, JdbcTemplate jdbcTemplate, boolean batch) {
super(endpoint);
this.jdbcTemplate = jdbcTemplate;
this.query = query;
+ this.batch = batch;
}
public void process(final Exchange exchange) throws Exception {
String queryHeader = exchange.getIn().getHeader(SqlConstants.SQL_QUERY, String.class);
- jdbcTemplate.execute(queryHeader != null ? queryHeader : query, new PreparedStatementCallback<Map<?, ?>>() {
- public Map<?, ?> doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException {
- int argNumber = 1;
+ String sql = queryHeader != null ? queryHeader : query;
- // number of parameters must match
+ jdbcTemplate.execute(sql, new PreparedStatementCallback<Map<?, ?>>() {
+ public Map<?, ?> doInPreparedStatement(PreparedStatement ps) throws SQLException {
int expected = ps.getParameterMetaData().getParameterCount();
- if (expected > 0 && exchange.getIn().getBody() != null) {
+ // transfer incoming message body data to prepared statement parameters, if necessary
+ if (exchange.getIn().getBody() != null) {
Iterator<?> iterator = exchange.getIn().getBody(Iterator.class);
- while (iterator != null && iterator.hasNext()) {
- Object value = iterator.next();
- log.trace("Setting parameter #{} with value: {}", argNumber, value);
- ps.setObject(argNumber, value);
- argNumber++;
+
+ if (batch) {
+ while (iterator != null && iterator.hasNext()) {
+ Object value = iterator.next();
+ Iterator<?> i = exchange.getContext().getTypeConverter().convertTo(Iterator.class, value);
+ populateStatement(ps, i, expected);
+ ps.addBatch();
+ }
+ } else {
+ populateStatement(ps, iterator, expected);
}
}
- if (argNumber - 1 != expected) {
- throw new SQLException("Number of parameters mismatch. Expected: " + expected + ", was:" + (argNumber - 1));
- }
-
- boolean isResultSet = ps.execute();
-
- if (isResultSet) {
- RowMapperResultSetExtractor<Map<String, Object>> mapper = new RowMapperResultSetExtractor<Map<String, Object>>(new ColumnMapRowMapper());
- List<Map<String, Object>> result = mapper.extractData(ps.getResultSet());
- exchange.getOut().setBody(result);
- exchange.getIn().setHeader(SqlConstants.SQL_ROW_COUNT, result.size());
- // preserve headers
- exchange.getOut().setHeaders(exchange.getIn().getHeaders());
+ // execute the prepared statement and populate the outgoing message
+ if (batch) {
+ int[] updateCounts = ps.executeBatch();
+ int total = 0;
+ for (int count : updateCounts) {
+ total += count;
+ }
+ exchange.getIn().setHeader(SqlConstants.SQL_UPDATE_COUNT, total);
} else {
- exchange.getIn().setHeader(SqlConstants.SQL_UPDATE_COUNT, ps.getUpdateCount());
+ boolean isResultSet = ps.execute();
+ if (isResultSet) {
+ RowMapperResultSetExtractor<Map<String, Object>> mapper = new RowMapperResultSetExtractor<Map<String, Object>>(new ColumnMapRowMapper());
+ List<Map<String, Object>> result = mapper.extractData(ps.getResultSet());
+ exchange.getOut().setBody(result);
+ exchange.getIn().setHeader(SqlConstants.SQL_ROW_COUNT, result.size());
+ // preserve headers
+ exchange.getOut().setHeaders(exchange.getIn().getHeaders());
+ } else {
+ exchange.getIn().setHeader(SqlConstants.SQL_UPDATE_COUNT, ps.getUpdateCount());
+ }
}
// data is set on exchange so return null
@@ -82,4 +93,19 @@ public class SqlProducer extends Default
});
}
+ private void populateStatement(PreparedStatement ps, Iterator<?> iterator, int expectedParams) throws SQLException {
+ int argNumber = 1;
+ if (expectedParams > 0) {
+ while (iterator != null && iterator.hasNext()) {
+ Object value = iterator.next();
+ log.trace("Setting parameter #{} with value: {}", argNumber, value);
+ ps.setObject(argNumber, value);
+ argNumber++;
+ }
+ }
+
+ if (argNumber - 1 != expectedParams) {
+ throw new SQLException("Number of parameters mismatch. Expected: " + expectedParams + ", was:" + (argNumber - 1));
+ }
+ }
}
Modified: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlRouteTest.java?rev=1208070&r1=1208069&r2=1208070&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlRouteTest.java (original)
+++ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlRouteTest.java Tue Nov 29 21:14:52 2011
@@ -17,6 +17,7 @@
package org.apache.camel.component.sql;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -33,10 +34,10 @@ import org.junit.Before;
import org.junit.Test;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.EmptyResultDataAccessException;
+import org.springframework.jdbc.UncategorizedSQLException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.SingleConnectionDataSource;
-
/**
* @version
*/
@@ -207,33 +208,78 @@ public class SqlRouteTest extends CamelT
assertEquals("Camel", row.get("PROJECT"));
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testBatch() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+ List<?> data = Arrays.asList(Arrays.asList(6, "abc", "def"), Arrays.asList(7, "ghi", "jkl"), Arrays.asList(8, "mno", "pqr"));
+ template.sendBody("direct:batch", data);
+ mock.assertIsSatisfied();
+ Number received = assertIsInstanceOf(Number.class, mock.getReceivedExchanges().get(0).getIn().getHeader(SqlConstants.SQL_UPDATE_COUNT));
+ assertEquals(3, received.intValue());
+ assertEquals("abc", jdbcTemplate.queryForObject("select project from projects where id = 6", String.class));
+ assertEquals("def", jdbcTemplate.queryForObject("select license from projects where id = 6", String.class));
+ assertEquals("ghi", jdbcTemplate.queryForObject("select project from projects where id = 7", String.class));
+ assertEquals("jkl", jdbcTemplate.queryForObject("select license from projects where id = 7", String.class));
+ assertEquals("mno", jdbcTemplate.queryForObject("select project from projects where id = 8", String.class));
+ assertEquals("pqr", jdbcTemplate.queryForObject("select license from projects where id = 8", String.class));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testBatchMissingParamAtEnd() throws Exception {
+ try {
+ List<?> data = Arrays.asList(Arrays.asList(9, "stu", "vwx"), Arrays.asList(10, "yza"));
+ template.sendBody("direct:batch", data);
+ fail();
+ } catch (RuntimeCamelException e) {
+ assertTrue(e.getCause() instanceof UncategorizedSQLException);
+ }
+ assertEquals(0, jdbcTemplate.queryForInt("select count(*) from projects where id = 9"));
+ assertEquals(0, jdbcTemplate.queryForInt("select count(*) from projects where id = 10"));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testBatchMissingParamAtBeginning() throws Exception {
+ try {
+ List<?> data = Arrays.asList(Arrays.asList(9, "stu"), Arrays.asList(10, "vwx", "yza"));
+ template.sendBody("direct:batch", data);
+ fail();
+ } catch (RuntimeCamelException e) {
+ assertTrue(e.getCause() instanceof UncategorizedSQLException);
+ }
+ assertEquals(0, jdbcTemplate.queryForInt("select count(*) from projects where id = 9"));
+ assertEquals(0, jdbcTemplate.queryForInt("select count(*) from projects where id = 10"));
+ }
@Before
public void setUp() throws Exception {
Class.forName(driverClass);
- super.setUp();
-
+ ds = new SingleConnectionDataSource(url, user, password, true);
+
jdbcTemplate = new JdbcTemplate(ds);
- jdbcTemplate.execute("create table projects (id integer primary key,"
- + "project varchar(10), license varchar(5))");
+ jdbcTemplate.execute("create table projects (id integer primary key, project varchar(10), license varchar(5))");
jdbcTemplate.execute("insert into projects values (1, 'Camel', 'ASF')");
jdbcTemplate.execute("insert into projects values (2, 'AMQ', 'ASF')");
jdbcTemplate.execute("insert into projects values (3, 'Linux', 'XXX')");
+
+ super.setUp();
}
@After
public void tearDown() throws Exception {
super.tearDown();
- JdbcTemplate jdbcTemplate = new JdbcTemplate(ds);
+
jdbcTemplate.execute("drop table projects");
+ ((SingleConnectionDataSource) ds).destroy();
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() {
- ds = new SingleConnectionDataSource(url, user, password, true);
-
getContext().getComponent("sql", SqlComponent.class).setDataSource(ds);
errorHandler(noErrorHandler());
@@ -254,6 +300,10 @@ public class SqlRouteTest extends CamelT
from("direct:no-param").to("sql:select * from projects order by id").to("mock:result");
from("direct:no-param-insert").to("sql:insert into projects values (5, '#', param)?placeholder=param").to("mock:result");
+
+ from("direct:batch")
+ .to("sql:insert into projects values (#, #, #)?batch=true")
+ .to("mock:result");
}
};
}