You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cm...@apache.org on 2011/02/09 23:21:18 UTC

svn commit: r1069147 - in /camel/trunk/components/camel-sql: ./ src/main/java/org/apache/camel/processor/ src/main/java/org/apache/camel/processor/idempotent/ src/main/java/org/apache/camel/processor/idempotent/jdbc/ src/test/java/org/apache/camel/proc...

Author: cmueller
Date: Wed Feb  9 22:21:18 2011
New Revision: 1069147

URL: http://svn.apache.org/viewvc?rev=1069147&view=rev
Log:
CAMEL-3648: Provide a JdbcMessageIdRepository for the idempotent consumer EIP

Added:
    camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/
    camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/
    camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/
    camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java
    camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/
    camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/
    camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/
    camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepositoryTest.java
    camel/trunk/components/camel-sql/src/test/resources/org/
    camel/trunk/components/camel-sql/src/test/resources/org/apache/
    camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/
    camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/
    camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/
    camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/
    camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/spring.xml
Modified:
    camel/trunk/components/camel-sql/pom.xml

Modified: camel/trunk/components/camel-sql/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/pom.xml?rev=1069147&r1=1069146&r2=1069147&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/pom.xml (original)
+++ camel/trunk/components/camel-sql/pom.xml Wed Feb  9 22:21:18 2011
@@ -28,33 +28,40 @@
     <packaging>bundle</packaging>
 	<name>Camel :: SQL</name>
 	<description>Camel SQL support</description>
-
-  <properties>
-	<camel.osgi.export.pkg>org.apache.camel.component.sql.*</camel.osgi.export.pkg>
-  </properties>
+	
+	<properties>
+		<camel.osgi.export.pkg>
+			org.apache.camel.component.sql.*;${camel.osgi.version},
+			org.apache.camel.processor.idempotent.jdbc.*;${camel.osgi.version}
+		</camel.osgi.export.pkg>
+		<camel.osgi.import.pkg>
+			!org.apache.camel.component.sql.*,
+			!org.apache.camel.processor.idempotent.jdbc.*,
+			*
+		</camel.osgi.import.pkg>
+	</properties>
 
 	<dependencies>
 		<dependency>
 			<groupId>org.apache.camel</groupId>
 			<artifactId>camel-core</artifactId>
 		</dependency>
+		<!-- to allow Spring annotations (jmx) to be tested -->
+		<dependency>
+			<groupId>org.springframework</groupId>
+			<artifactId>spring-context</artifactId>
+		</dependency>
 		<dependency>
 			<groupId>org.springframework</groupId>
 			<artifactId>spring-jdbc</artifactId>
 		</dependency>
+		
 		<!-- test dependencies -->
 		<dependency>
 			<groupId>org.apache.camel</groupId>
 			<artifactId>camel-test</artifactId>			
 			<scope>test</scope>
 		</dependency>
-		<!-- to allow Spring annotations (jmx) to be tested -->
-		<dependency>
-			<groupId>org.springframework</groupId>
-			<artifactId>spring-context</artifactId>
-			<optional>true</optional>
-			<scope>test</scope>
-		</dependency>
 		<dependency>
 			<groupId>org.springframework</groupId>
 			<artifactId>spring-aop</artifactId>

Added: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java?rev=1069147&view=auto
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java (added)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java Wed Feb  9 22:21:18 2011
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent.jdbc;
+
+import javax.sql.DataSource;
+
+import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.spi.IdempotentRepository;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.jmx.export.annotation.ManagedAttribute;
+import org.springframework.jmx.export.annotation.ManagedOperation;
+import org.springframework.jmx.export.annotation.ManagedResource;
+import org.springframework.transaction.TransactionDefinition;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionCallback;
+import org.springframework.transaction.support.TransactionTemplate;
+
+/**
+ * @version $Revision$
+ */
+@ManagedResource("JdbcMessageIdRepository")
+public class JdbcMessageIdRepository extends ServiceSupport implements IdempotentRepository<String> {
+    
+    protected static final String QUERY_STRING = "SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?";
+    protected static final String INSERT_STRING = "INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId) VALUES (?, ?)";
+    protected static final String DELETE_STRING = "DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?";
+    
+    private final JdbcTemplate jdbcTemplate;
+    private final String processorName;
+    private final TransactionTemplate transactionTemplate;
+
+    public JdbcMessageIdRepository(DataSource dataSource, String processorName) {
+        this(dataSource, createTransactionTemplate(dataSource), processorName);
+    }
+
+    public JdbcMessageIdRepository(DataSource dataSource, TransactionTemplate transactionTemplate, String processorName) {
+        this.jdbcTemplate = new JdbcTemplate(dataSource);
+        this.jdbcTemplate.afterPropertiesSet();
+        this.processorName = processorName;
+        this.transactionTemplate = transactionTemplate;
+    }
+
+    public static JdbcMessageIdRepository jpaMessageIdRepository(DataSource dataSource, String processorName) {
+        return new JdbcMessageIdRepository(dataSource, processorName);
+    }
+
+    private static TransactionTemplate createTransactionTemplate(DataSource dataSource) {
+        TransactionTemplate transactionTemplate = new TransactionTemplate();
+        transactionTemplate.setTransactionManager(new DataSourceTransactionManager(dataSource));
+        transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
+        return transactionTemplate;
+    }
+
+    @ManagedOperation(description = "Adds the key to the store")
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public boolean add(final String messageId) {
+        // Run this in single transaction.
+        Boolean rc = (Boolean)transactionTemplate.execute(new TransactionCallback() {
+            public Object doInTransaction(TransactionStatus status) {
+                int count = jdbcTemplate.queryForInt(QUERY_STRING, processorName, messageId);
+                if (count == 0) {
+                    jdbcTemplate.update(INSERT_STRING, processorName, messageId);
+                    return Boolean.TRUE;
+                } else {
+                    return Boolean.FALSE;
+                }
+            }
+        });
+        return rc.booleanValue();
+    }
+
+    @ManagedOperation(description = "Does the store contain the given key")
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public boolean contains(final String messageId) {
+        // Run this in single transaction.
+        Boolean rc = (Boolean)transactionTemplate.execute(new TransactionCallback() {
+            public Object doInTransaction(TransactionStatus status) {
+                int count = jdbcTemplate.queryForInt(QUERY_STRING, processorName, messageId);
+                if (count == 0) {
+                    return Boolean.FALSE;
+                } else {
+                    return Boolean.TRUE;
+                }
+            }
+        });
+        return rc.booleanValue();
+    }
+
+    @ManagedOperation(description = "Remove the key from the store")
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public boolean remove(final String messageId) {
+        Boolean rc = (Boolean)transactionTemplate.execute(new TransactionCallback() {
+            public Object doInTransaction(TransactionStatus status) {
+                int updateCount = jdbcTemplate.update(DELETE_STRING, processorName, messageId);
+                if (updateCount == 0) {
+                    return Boolean.FALSE;
+                } else {
+                    return Boolean.TRUE;
+                }
+            }
+        });
+        return rc.booleanValue();
+    }
+
+    public boolean confirm(String s) {
+        // noop
+        return true;
+    }
+
+    @ManagedAttribute(description = "The processor name")
+    public String getProcessorName() {
+        return processorName;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+    }
+}

Added: camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepositoryTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepositoryTest.java?rev=1069147&view=auto
==============================================================================
--- camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepositoryTest.java (added)
+++ camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepositoryTest.java Wed Feb  9 22:21:18 2011
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent.jdbc;
+
+import java.util.List;
+
+import javax.sql.DataSource;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.test.junit4.CamelSpringTestSupport;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.context.support.AbstractApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+import org.springframework.dao.DataAccessException;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.transaction.TransactionDefinition;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionCallback;
+import org.springframework.transaction.support.TransactionTemplate;
+
+
+public class JdbcMessageIdRepositoryTest extends CamelSpringTestSupport {
+
+    protected static final String SELECT_ALL_STRING = "SELECT messageId FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ?";
+    protected static final String DELETE_ALL_STRING = "DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ?";
+    protected static final String PROCESSOR_NAME = "myProcessorName";
+
+    protected JdbcTemplate jdbcTemplate;
+    protected DataSource dataSource;
+    
+    @EndpointInject(uri = "mock:result")
+    protected MockEndpoint resultEndpoint;
+
+    @EndpointInject(uri = "mock:error")
+    protected MockEndpoint errorEndpoint;
+    
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        
+        dataSource = context.getRegistry().lookup("dataSource", DataSource.class);
+        jdbcTemplate = new JdbcTemplate(dataSource);
+        jdbcTemplate.afterPropertiesSet();
+        
+        setupRepository();
+    }
+    
+    @Override
+    protected AbstractApplicationContext createApplicationContext() {
+        return new ClassPathXmlApplicationContext("org/apache/camel/processor/idempotent/jdbc/spring.xml");
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    protected void setupRepository() {
+        TransactionTemplate transactionTemplate = new TransactionTemplate();
+        transactionTemplate.setTransactionManager(new DataSourceTransactionManager(dataSource));
+        transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
+        
+        transactionTemplate.execute(new TransactionCallback() {
+            public Object doInTransaction(TransactionStatus status) {
+                try {
+                    jdbcTemplate.execute("CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR(20), messageId VARCHAR(10))");
+                } catch (DataAccessException e) {
+                    // noop if table already exists 
+                }
+                jdbcTemplate.update(DELETE_ALL_STRING, PROCESSOR_NAME);
+                return Boolean.TRUE;
+            }
+        });
+    }
+
+    @Test
+    public void testDuplicateMessagesAreFilteredOut() throws Exception {
+        resultEndpoint.expectedBodiesReceived("one", "two", "three");
+        errorEndpoint.expectedMessageCount(0);
+
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("3", "three");
+
+        assertMockEndpointsSatisfied();
+
+        // all 3 messages should be in jdbc repo
+        List<String> receivedMessageIds = jdbcTemplate.queryForList(SELECT_ALL_STRING, String.class, PROCESSOR_NAME);
+
+        assertEquals(3, receivedMessageIds.size());
+        assertTrue(receivedMessageIds.contains("1"));
+        assertTrue(receivedMessageIds.contains("2"));
+        assertTrue(receivedMessageIds.contains("3"));
+    }
+
+    @Test
+    public void testFailedExchangesNotAdded() throws Exception {
+        RouteBuilder interceptor = new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                interceptSendToEndpoint("mock:result")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            String id = exchange.getIn().getHeader("messageId", String.class);
+                            if (id.equals("2")) {
+                                throw new IllegalArgumentException("Damn I cannot handle id 2");
+                            }
+                        }
+                    });
+            }
+        };
+        RouteDefinition routeDefinition = context.getRouteDefinition("JdbcMessageIdRepositoryTest");
+        routeDefinition.adviceWith(context, interceptor);
+
+        // we send in 2 messages with id 2 that fails
+        errorEndpoint.expectedMessageCount(2);
+        resultEndpoint.expectedBodiesReceived("one", "three");
+
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("2", "two");
+        sendMessage("1", "one");
+        sendMessage("3", "three");
+
+        assertMockEndpointsSatisfied();
+
+        // only message 1 and 3 should be in jdbc repo
+        List<String> receivedMessageIds = jdbcTemplate.queryForList(SELECT_ALL_STRING, String.class, PROCESSOR_NAME);
+
+        assertEquals(2, receivedMessageIds.size());
+        assertTrue("Should contain message 1", receivedMessageIds.contains("1"));
+        assertTrue("Should contain message 3", receivedMessageIds.contains("3"));
+    }
+
+    protected void sendMessage(final Object messageId, final Object body) {
+        template.send("direct:start", new Processor() {
+            public void process(Exchange exchange) {
+                Message in = exchange.getIn();
+                in.setBody(body);
+                in.setHeader("messageId", messageId);
+            }
+        });
+    }
+}
\ No newline at end of file

Added: camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/spring.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/spring.xml?rev=1069147&view=auto
==============================================================================
--- camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/spring.xml (added)
+++ camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/spring.xml Wed Feb  9 22:21:18 2011
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:camel="http://camel.apache.org/schema/spring"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+           http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+           http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+
+    <bean id="dataSource" class="org.springframework.jdbc.datasource.SingleConnectionDataSource">
+        <property name="driverClassName" value="org.hsqldb.jdbcDriver"/>
+        <property name="url" value="jdbc:hsqldb:mem:camel_jdbc"/>
+        <property name="username" value="sa"/>
+        <property name="password" value=""/>
+    </bean>
+    
+    <bean id="messageIdRepository" class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository">
+    	<constructor-arg ref="dataSource" />
+    	<constructor-arg value="myProcessorName" />
+    </bean>
+    
+    <camel:camelContext>
+    	<camel:errorHandler id="deadLetterChannel" type="DeadLetterChannel" deadLetterUri="mock:error">
+    		<camel:redeliveryPolicy maximumRedeliveries="0" maximumRedeliveryDelay="0" logStackTrace="false" />
+    	</camel:errorHandler>
+    	
+    	<camel:route id="JdbcMessageIdRepositoryTest" errorHandlerRef="deadLetterChannel">
+    		<camel:from uri="direct:start" />
+    		<camel:idempotentConsumer messageIdRepositoryRef="messageIdRepository">
+    			<camel:header>messageId</camel:header>
+    			<camel:to uri="mock:result" />
+    		</camel:idempotentConsumer>
+    	</camel:route>
+    </camel:camelContext>
+</beans>
\ No newline at end of file