You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cm...@apache.org on 2011/11/29 22:34:27 UTC

svn commit: r1208088 - in /camel/branches/camel-2.7.x/components/camel-sql/src: main/java/org/apache/camel/component/sql/SqlEndpoint.java main/java/org/apache/camel/component/sql/SqlProducer.java test/java/org/apache/camel/component/sql/SqlRouteTest.java

Author: cmueller
Date: Tue Nov 29 21:34:26 2011
New Revision: 1208088

URL: http://svn.apache.org/viewvc?rev=1208088&view=rev
Log:
CAMEL-4662: add batching support to sql component
Thank you Daniel Gredler for the patch

Modified:
    camel/branches/camel-2.7.x/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
    camel/branches/camel-2.7.x/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
    camel/branches/camel-2.7.x/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlRouteTest.java

Modified: camel/branches/camel-2.7.x/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java?rev=1208088&r1=1208087&r2=1208088&view=diff
==============================================================================
--- camel/branches/camel-2.7.x/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java (original)
+++ camel/branches/camel-2.7.x/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlEndpoint.java Tue Nov 29 21:34:26 2011
@@ -31,6 +31,7 @@ import org.springframework.jdbc.core.Jdb
 public class SqlEndpoint extends DefaultEndpoint {
     private JdbcTemplate jdbcTemplate;
     private String query;
+    private boolean batch;
 
     public SqlEndpoint() {
     }
@@ -46,7 +47,7 @@ public class SqlEndpoint extends Default
     }
 
     public Producer createProducer() throws Exception {
-        return new SqlProducer(this, query, jdbcTemplate);
+        return new SqlProducer(this, query, jdbcTemplate, batch);
     }
 
     public boolean isSingleton() {
@@ -69,6 +70,14 @@ public class SqlEndpoint extends Default
         this.query = query;
     }
 
+    public boolean isBatch() {
+        return batch;
+    }
+
+    public void setBatch(boolean batch) {
+        this.batch = batch;
+    }
+
     @Override
     protected String createEndpointUri() {
         return "sql:" + query;

Modified: camel/branches/camel-2.7.x/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java?rev=1208088&r1=1208087&r2=1208088&view=diff
==============================================================================
--- camel/branches/camel-2.7.x/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java (original)
+++ camel/branches/camel-2.7.x/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java Tue Nov 29 21:34:26 2011
@@ -20,10 +20,10 @@ import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultProducer;
-import org.springframework.dao.DataAccessException;
 import org.springframework.jdbc.core.ColumnMapRowMapper;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.jdbc.core.PreparedStatementCallback;
@@ -32,44 +32,56 @@ import org.springframework.jdbc.core.Row
 public class SqlProducer extends DefaultProducer {
     private String query;
     private JdbcTemplate jdbcTemplate;
+    private boolean batch;
 
-    public SqlProducer(SqlEndpoint endpoint, String query, JdbcTemplate jdbcTemplate) {
+    public SqlProducer(SqlEndpoint endpoint, String query, JdbcTemplate jdbcTemplate, boolean batch) {
         super(endpoint);
         this.jdbcTemplate = jdbcTemplate;
         this.query = query;
+        this.batch = batch;
     }
 
-    @SuppressWarnings("unchecked")
     public void process(final Exchange exchange) throws Exception {
-        jdbcTemplate.execute(query, new PreparedStatementCallback() {
-            public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException {
-                int argNumber = 1;
-
-                // number of parameters must match
+        jdbcTemplate.execute(query, new PreparedStatementCallback<Map<?, ?>>() {
+            public Map<?, ?> doInPreparedStatement(PreparedStatement ps) throws SQLException {
                 int expected = ps.getParameterMetaData().getParameterCount();
 
-                if (expected > 0 && exchange.getIn().getBody() != null) {
+                // transfer incoming message body data to prepared statement parameters, if necessary
+                if (exchange.getIn().getBody() != null) {
                     Iterator<?> iterator = exchange.getIn().getBody(Iterator.class);
-                    while (iterator != null && iterator.hasNext()) {
-                        ps.setObject(argNumber++, iterator.next());
+                    
+                    if (batch) {
+                        while (iterator != null && iterator.hasNext()) {
+                            Object value = iterator.next();
+                            Iterator<?> i = exchange.getContext().getTypeConverter().convertTo(Iterator.class, value);
+                            populateStatement(ps, i, expected);
+                            ps.addBatch();
+                        }
+                    } else {
+                        populateStatement(ps, iterator, expected);
                     }
                 }
 
-                if (argNumber - 1 != expected) {
-                    throw new SQLException("Number of parameters mismatch. Expected: " + expected + ", was:" + (argNumber - 1));
-                }
-                
-                boolean isResultSet = ps.execute();
-                
-                if (isResultSet) {
-                    RowMapperResultSetExtractor mapper = new RowMapperResultSetExtractor(new ColumnMapRowMapper());
-                    List<?> result = (List<?>) mapper.extractData(ps.getResultSet());
-                    exchange.getOut().setBody(result);
-                    exchange.getIn().setHeader(SqlConstants.SQL_ROW_COUNT, result.size());
-                    // preserve headers
-                    exchange.getOut().setHeaders(exchange.getIn().getHeaders());
+                // execute the prepared statement and populate the outgoing message
+                if (batch) {
+                    int[] updateCounts = ps.executeBatch();
+                    int total = 0;
+                    for (int count : updateCounts) {
+                        total += count;
+                    }
+                    exchange.getIn().setHeader(SqlConstants.SQL_UPDATE_COUNT, total);
                 } else {
-                    exchange.getIn().setHeader(SqlConstants.SQL_UPDATE_COUNT, ps.getUpdateCount());
+                    boolean isResultSet = ps.execute();
+                    if (isResultSet) {
+                        RowMapperResultSetExtractor<Map<String, Object>> mapper = new RowMapperResultSetExtractor<Map<String, Object>>(new ColumnMapRowMapper());
+                        List<Map<String, Object>> result = mapper.extractData(ps.getResultSet());
+                        exchange.getOut().setBody(result);
+                        exchange.getIn().setHeader(SqlConstants.SQL_ROW_COUNT, result.size());
+                        // preserve headers
+                        exchange.getOut().setHeaders(exchange.getIn().getHeaders());
+                    } else {
+                        exchange.getIn().setHeader(SqlConstants.SQL_UPDATE_COUNT, ps.getUpdateCount());
+                    }
                 }
 
                 // data is set on exchange so return null
@@ -78,4 +90,19 @@ public class SqlProducer extends Default
         });
     }
 
+    private void populateStatement(PreparedStatement ps, Iterator<?> iterator, int expectedParams) throws SQLException {
+        int argNumber = 1;
+        if (expectedParams > 0) {
+            while (iterator != null && iterator.hasNext()) {
+                Object value = iterator.next();
+                log.trace("Setting parameter #{} with value: {}", argNumber, value);
+                ps.setObject(argNumber, value);
+                argNumber++;
+            }
+        }
+        
+        if (argNumber - 1 != expectedParams) {
+            throw new SQLException("Number of parameters mismatch. Expected: " + expectedParams + ", was:" + (argNumber - 1));
+        }
+    }
 }

Modified: camel/branches/camel-2.7.x/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlRouteTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlRouteTest.java?rev=1208088&r1=1208087&r2=1208088&view=diff
==============================================================================
--- camel/branches/camel-2.7.x/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlRouteTest.java (original)
+++ camel/branches/camel-2.7.x/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlRouteTest.java Tue Nov 29 21:34:26 2011
@@ -17,6 +17,7 @@
 package org.apache.camel.component.sql;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -31,6 +32,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.springframework.dao.DataAccessException;
 import org.springframework.dao.EmptyResultDataAccessException;
+import org.springframework.jdbc.UncategorizedSQLException;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.jdbc.datasource.SingleConnectionDataSource;
 
@@ -176,33 +178,78 @@ public class SqlRouteTest extends CamelT
         assertEquals("Camel", row.get("PROJECT"));
     }
 
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testBatch() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        List<?> data = Arrays.asList(Arrays.asList(6, "abc", "def"), Arrays.asList(7, "ghi", "jkl"), Arrays.asList(8, "mno", "pqr"));
+        template.sendBody("direct:batch", data);
+        mock.assertIsSatisfied();
+        Number received = assertIsInstanceOf(Number.class, mock.getReceivedExchanges().get(0).getIn().getHeader(SqlConstants.SQL_UPDATE_COUNT));
+        assertEquals(3, received.intValue());
+        assertEquals("abc", jdbcTemplate.queryForObject("select project from projects where id = 6", String.class));
+        assertEquals("def", jdbcTemplate.queryForObject("select license from projects where id = 6", String.class));
+        assertEquals("ghi", jdbcTemplate.queryForObject("select project from projects where id = 7", String.class));
+        assertEquals("jkl", jdbcTemplate.queryForObject("select license from projects where id = 7", String.class));
+        assertEquals("mno", jdbcTemplate.queryForObject("select project from projects where id = 8", String.class));
+        assertEquals("pqr", jdbcTemplate.queryForObject("select license from projects where id = 8", String.class));
+    }
+    
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testBatchMissingParamAtEnd() throws Exception {
+        try {
+            List<?> data = Arrays.asList(Arrays.asList(9, "stu", "vwx"), Arrays.asList(10, "yza"));
+            template.sendBody("direct:batch", data);
+            fail();
+        } catch (RuntimeCamelException e) {
+            assertTrue(e.getCause() instanceof UncategorizedSQLException);
+        }
+        assertEquals(0, jdbcTemplate.queryForInt("select count(*) from projects where id = 9"));
+        assertEquals(0, jdbcTemplate.queryForInt("select count(*) from projects where id = 10"));
+    }
+    
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testBatchMissingParamAtBeginning() throws Exception {
+        try {
+            List<?> data = Arrays.asList(Arrays.asList(9, "stu"), Arrays.asList(10, "vwx", "yza"));
+            template.sendBody("direct:batch", data);
+            fail();
+        } catch (RuntimeCamelException e) {
+            assertTrue(e.getCause() instanceof UncategorizedSQLException);
+        }
+        assertEquals(0, jdbcTemplate.queryForInt("select count(*) from projects where id = 9"));
+        assertEquals(0, jdbcTemplate.queryForInt("select count(*) from projects where id = 10"));
+    }
     
     @Before
     public void setUp() throws Exception {
         Class.forName(driverClass);
-        super.setUp();
-
+        ds = new SingleConnectionDataSource(url, user, password, true);
+        
         jdbcTemplate = new JdbcTemplate(ds);
-        jdbcTemplate.execute("create table projects (id integer primary key,"
-                             + "project varchar(10), license varchar(5))");
+        jdbcTemplate.execute("create table projects (id integer primary key, project varchar(10), license varchar(5))");
         jdbcTemplate.execute("insert into projects values (1, 'Camel', 'ASF')");
         jdbcTemplate.execute("insert into projects values (2, 'AMQ', 'ASF')");
         jdbcTemplate.execute("insert into projects values (3, 'Linux', 'XXX')");
+        
+        super.setUp();
     }
 
     @After
     public void tearDown() throws Exception {
         super.tearDown();
-        JdbcTemplate jdbcTemplate = new JdbcTemplate(ds);
+        
         jdbcTemplate.execute("drop table projects");
+        ((SingleConnectionDataSource) ds).destroy();
     }
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() {
-                ds = new SingleConnectionDataSource(url, user, password, true);
-
                 getContext().getComponent("sql", SqlComponent.class).setDataSource(ds);
 
                 errorHandler(noErrorHandler());
@@ -223,6 +270,10 @@ public class SqlRouteTest extends CamelT
                 from("direct:no-param").to("sql:select * from projects order by id").to("mock:result");
                 
                 from("direct:no-param-insert").to("sql:insert into projects values (5, '#', param)?placeholder=param").to("mock:result");
+                
+                from("direct:batch")
+                    .to("sql:insert into projects values (#, #, #)?batch=true")
+                    .to("mock:result");
             }
         };
     }