You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by Claus Ibsen <cl...@gmail.com> on 2011/04/27 08:44:44 UTC

Re: svn commit: r1096880 - in /camel/trunk/components/camel-jdbc/src: main/java/org/apache/camel/component/jdbc/ test/java/org/apache/camel/component/jdbc/

Hi

I think if we should behave nicely you ought to remember the
autoCommit setting on the Connection since its often from a connection
pool. And thus returned back to the pool. And in case we may have
"altered" its default settings.

So if the connection pool is shared and use by other JDBC access
(maybe JPA) and they rely on auto commit being true. Then we just
changed that to false :).

But I dont think many people do that normally in this kind of code. We
could add a note in the wiki that it will set the auto commit to false
on the connection it obtains.

Alternatively we can remember the old value and set it back in case we
*changed* the option.


On Tue, Apr 26, 2011 at 9:58 PM,  <cm...@apache.org> wrote:
> Author: cmueller
> Date: Tue Apr 26 19:58:17 2011
> New Revision: 1096880
>
> URL: http://svn.apache.org/viewvc?rev=1096880&view=rev
> Log:
> CAMEL-3803: Component camel-jdbc does not support Transactions
> Thanks Heath Kesler for the patch
>
> Modified:
>    camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java
>    camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
>    camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcOptionsTest.java
>    camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerConcurrenctTest.java
>    camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcStatementParametersTest.java
>
> Modified: camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java
> URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java?rev=1096880&r1=1096879&r2=1096880&view=diff
> ==============================================================================
> --- camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java (original)
> +++ camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcEndpoint.java Tue Apr 26 19:58:17 2011
> @@ -30,6 +30,7 @@ import org.apache.camel.impl.DefaultEndp
>  */
>  public class JdbcEndpoint extends DefaultEndpoint {
>     private int readSize;
> +    private boolean transacted;
>     private DataSource dataSource;
>     private Map<String, Object> parameters;
>     private boolean useJDBC4ColumnNameAndLabelSemantics = true;
> @@ -51,7 +52,7 @@ public class JdbcEndpoint extends Defaul
>     }
>
>     public Producer createProducer() throws Exception {
> -        return new JdbcProducer(this, dataSource, readSize, parameters);
> +        return new JdbcProducer(this, dataSource, readSize,  parameters);
>     }
>
>     public int getReadSize() {
> @@ -62,6 +63,14 @@ public class JdbcEndpoint extends Defaul
>         this.readSize = readSize;
>     }
>
> +    public boolean isTransacted() {
> +        return transacted;
> +    }
> +
> +    public void setTransacted(boolean transacted) {
> +        this.transacted = transacted;
> +    }
> +
>     public DataSource getDataSource() {
>         return dataSource;
>     }
>
> Modified: camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java
> URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java?rev=1096880&r1=1096879&r2=1096880&view=diff
> ==============================================================================
> --- camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java (original)
> +++ camel/trunk/components/camel-jdbc/src/main/java/org/apache/camel/component/jdbc/JdbcProducer.java Tue Apr 26 19:58:17 2011
> @@ -62,10 +62,13 @@ public class JdbcProducer extends Defaul
>         Connection conn = null;
>         Statement stmt = null;
>         ResultSet rs = null;
> +
>         try {
>             conn = dataSource.getConnection();
> +            conn.setAutoCommit(false);
> +
>             stmt = conn.createStatement();
> -
> +
>             if (parameters != null && !parameters.isEmpty()) {
>                 IntrospectionSupport.setProperties(stmt, parameters);
>             }
> @@ -81,26 +84,54 @@ public class JdbcProducer extends Defaul
>                 int updateCount = stmt.getUpdateCount();
>                 exchange.getOut().setHeader(JdbcConstants.JDBC_UPDATE_COUNT, updateCount);
>             }
> -        } finally {
> -            try {
> -                if (rs != null) {
> -                    rs.close();
> -                }
> -                if (stmt != null) {
> -                    stmt.close();
> -                }
> -                if (conn != null) {
> -                    conn.close();
> -                }
> -            } catch (SQLException e) {
> -                LOG.warn("Error closing JDBC resource: " + e, e);
> +            conn.commit();
> +        } catch (Exception e){
> +            try{
> +                conn.rollback();
> +            } catch (SQLException sqle){
> +                LOG.warn("Error on jdbc component rollback: " + sqle, sqle);
>             }
> +            throw e;
> +        } finally {
> +            closeQuietly(rs);
> +            closeQuietly(stmt);
> +            closeQuietly(conn);
>         }
>
>         // populate headers
>         exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
>     }
>
> +    private void closeQuietly(ResultSet rs) {
> +        if (rs != null) {
> +            try{
> +                rs.close();
> +            } catch (SQLException sqle){
> +                LOG.warn("Error by closing result set: " + sqle, sqle);
> +            }
> +        }
> +    }
> +
> +    private void closeQuietly(Statement stmt) {
> +        if (stmt != null) {
> +            try{
> +                stmt.close();
> +            } catch (SQLException sqle){
> +                LOG.warn("Error by closing statement: " + sqle, sqle);
> +            }
> +        }
> +    }
> +
> +    private void closeQuietly(Connection con) {
> +        if (con != null) {
> +            try{
> +                con.close();
> +            } catch (SQLException sqle){
> +                LOG.warn("Error by closing connection: " + sqle, sqle);
> +            }
> +        }
> +    }
> +
>     /**
>      * Sets the result from the ResultSet to the Exchange as its OUT body.
>      */
>
> Modified: camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcOptionsTest.java
> URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcOptionsTest.java?rev=1096880&r1=1096879&r2=1096880&view=diff
> ==============================================================================
> --- camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcOptionsTest.java (original)
> +++ camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcOptionsTest.java Tue Apr 26 19:58:17 2011
> @@ -21,6 +21,7 @@ import java.util.List;
>
>  import javax.sql.DataSource;
>
> +import org.apache.camel.CamelExecutionException;
>  import org.apache.camel.ResolveEndpointFailedException;
>  import org.apache.camel.builder.RouteBuilder;
>  import org.apache.camel.component.mock.MockEndpoint;
> @@ -39,6 +40,7 @@ public class JdbcOptionsTest extends Cam
>     private String password = "";
>     private DataSource ds;
>
> +    @SuppressWarnings("rawtypes")
>     @Test
>     public void testReadSize() throws Exception {
>         MockEndpoint mock = getMockEndpoint("mock:result");
> @@ -52,6 +54,58 @@ public class JdbcOptionsTest extends Cam
>         assertEquals(1, list.size());
>     }
>
> +    @SuppressWarnings("rawtypes")
> +    @Test
> +    public void testInsertCommitO() throws Exception {
> +        MockEndpoint mock = getMockEndpoint("mock:resultTx");
> +        mock.expectedMessageCount(1);
> +        // insert 2 recs into table
> +        template.sendBody("direct:startTx", "insert into customer values ('cust3', 'johnsmith');insert into customer values ('cust4', 'hkesler') ");
> +
> +        mock.assertIsSatisfied();
> +
> +        String body = mock.getExchanges().get(0).getIn().getBody(String.class);
> +        assertNull(body);
> +
> +        // now test to see that they were inserted and committed properly
> +        MockEndpoint mockTest = getMockEndpoint("mock:retrieve");
> +        mockTest.expectedMessageCount(1);
> +
> +        template.sendBody("direct:retrieve", "select * from customer");
> +
> +        mockTest.assertIsSatisfied();
> +
> +        List list = mockTest.getExchanges().get(0).getIn().getBody(ArrayList.class);
> +        // both records were committed
> +        assertEquals(4, list.size());
> +    }
> +
> +    @SuppressWarnings("rawtypes")
> +    @Test
> +    public void testInsertRollback() throws Exception {
> +        // insert 2 records
> +        try{
> +            template.sendBody("direct:startTx", "insert into customer values ('cust3', 'johnsmith');insert into customer values ('cust3', 'hkesler')");
> +            fail("Should have thrown a CamelExecutionException");
> +        } catch (CamelExecutionException e) {
> +            if (!e.getCause().getMessage().contains("Violation of unique constraint")) {
> +                fail("Test did not throw the expected Constraint Violation Exception");
> +            }
> +        }
> +
> +        // check to see that they failed by getting a rec count from table
> +        MockEndpoint mockTest = getMockEndpoint("mock:retrieve");
> +        mockTest.expectedMessageCount(1);
> +
> +        template.sendBody("direct:retrieve", "select * from customer");
> +
> +        mockTest.assertIsSatisfied();
> +
> +        List list = mockTest.getExchanges().get(0).getIn().getBody(ArrayList.class);
> +        // all recs failed to insert
> +        assertEquals(2, list.size());
> +    }
> +
>     @Test
>     public void testNoDataSourceInRegistry() throws Exception {
>         try {
> @@ -73,6 +127,8 @@ public class JdbcOptionsTest extends Cam
>         return new RouteBuilder() {
>             public void configure() throws Exception {
>                 from("direct:start").to("jdbc:testdb?readSize=1").to("mock:result");
> +                from("direct:retrieve").to("jdbc:testdb").to("mock:retrieve");
> +                from("direct:startTx").to("jdbc:testdb?transacted=true").to("mock:resultTx");
>             }
>         };
>     }
> @@ -84,7 +140,7 @@ public class JdbcOptionsTest extends Cam
>         ds = dataSource;
>
>         JdbcTemplate jdbc = new JdbcTemplate(ds);
> -        jdbc.execute("create table customer (id varchar(15), name varchar(10))");
> +        jdbc.execute("create table customer (id varchar(15) PRIMARY KEY, name varchar(10))");
>         jdbc.execute("insert into customer values('cust1','jstrachan')");
>         jdbc.execute("insert into customer values('cust2','nsandhu')");
>         super.setUp();
>
> Modified: camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerConcurrenctTest.java
> URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerConcurrenctTest.java?rev=1096880&r1=1096879&r2=1096880&view=diff
> ==============================================================================
> --- camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerConcurrenctTest.java (original)
> +++ camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcProducerConcurrenctTest.java Tue Apr 26 19:58:17 2011
> @@ -55,14 +55,15 @@ public class JdbcProducerConcurrenctTest
>         doSendMessages(10, 5);
>     }
>
> +    @SuppressWarnings("rawtypes")
>     private void doSendMessages(int files, int poolSize) throws Exception {
>         getMockEndpoint("mock:result").expectedMessageCount(files);
>
>         ExecutorService executor = Executors.newFixedThreadPool(poolSize);
> -        Map<Integer, Future> responses = new ConcurrentHashMap<Integer, Future>();
> +        Map<Integer, Future<Object>> responses = new ConcurrentHashMap<Integer, Future<Object>>();
>         for (int i = 0; i < files; i++) {
>             final int index = i;
> -            Future out = executor.submit(new Callable<Object>() {
> +            Future<Object> out = executor.submit(new Callable<Object>() {
>                 public Object call() throws Exception {
>                     int id = index % 2;
>                     return template.requestBody("direct:start", "select * from customer where id = " + id);
>
> Modified: camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcStatementParametersTest.java
> URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcStatementParametersTest.java?rev=1096880&r1=1096879&r2=1096880&view=diff
> ==============================================================================
> --- camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcStatementParametersTest.java (original)
> +++ camel/trunk/components/camel-jdbc/src/test/java/org/apache/camel/component/jdbc/JdbcStatementParametersTest.java Tue Apr 26 19:58:17 2011
> @@ -38,6 +38,7 @@ public class JdbcStatementParametersTest
>     private String user = "sa";
>     private String password = "";
>
> +    @SuppressWarnings("rawtypes")
>     @Test
>     public void testMax2Rows() throws Exception {
>         List rows = template.requestBody("direct:hello", "select * from customer order by id", List.class);
> @@ -46,6 +47,7 @@ public class JdbcStatementParametersTest
>         assertEquals(2, context.getEndpoints().size());
>     }
>
> +    @SuppressWarnings("rawtypes")
>     @Test
>     public void testMax5Rows() throws Exception {
>         List rows = template.requestBody("jdbc:testdb?statement.maxRows=5&statement.fetchSize=50", "select * from customer order by id", List.class);
> @@ -54,6 +56,7 @@ public class JdbcStatementParametersTest
>         assertEquals(3, context.getEndpoints().size());
>     }
>
> +    @SuppressWarnings("rawtypes")
>     @Test
>     public void testNoParameters() throws Exception {
>         List rows = template.requestBody("jdbc:testdb", "select * from customer order by id", List.class);
>
>
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
CamelOne 2011: http://fusesource.com/camelone2011/
Twitter: davsclaus
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/