You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/12/23 07:11:19 UTC

svn commit: r1222582 - in /camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc: AbstractJdbcMessageIdRepository.java JdbcMessageIdRepository.java

Author: davsclaus
Date: Fri Dec 23 06:11:18 2011
New Revision: 1222582

URL: http://svn.apache.org/viewvc?rev=1222582&view=rev
Log:
CAMEL-4822: Introduced AbstractJdbcMessageIdRepository to make it easier to implement custom JDBC repo, to control the SQL being used. Thanks to Philip Glebow for the patch.

Added:
    camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/AbstractJdbcMessageIdRepository.java
Modified:
    camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java

Added: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/AbstractJdbcMessageIdRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/AbstractJdbcMessageIdRepository.java?rev=1222582&view=auto
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/AbstractJdbcMessageIdRepository.java (added)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/AbstractJdbcMessageIdRepository.java Fri Dec 23 06:11:18 2011
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent.jdbc;
+
+import javax.sql.DataSource;
+
+import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.support.ServiceSupport;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.jmx.export.annotation.ManagedOperation;
+import org.springframework.jmx.export.annotation.ManagedResource;
+import org.springframework.transaction.TransactionDefinition;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionCallback;
+import org.springframework.transaction.support.TransactionTemplate;
+
+/**
+ * Base class for JDBC-based idempotent repositories that allows the schema to be changed.
+ * <p/>
+ * Subclasses need only implement theses methods:
+ * <ul>
+ *   <li>{@link #queryForInt(T key)}</li>
+ *   <li>{@link #insert(T key)}</li>
+ *   <li>{@link #delete(T key)}</li>
+ * </ul>
+ * <p/>
+ * These methods should perform the named database operation.
+ */
+@ManagedResource("JDBC IdempotentRepository")
+public abstract class AbstractJdbcMessageIdRepository<T> extends ServiceSupport implements IdempotentRepository<T> {
+
+    protected JdbcTemplate jdbcTemplate;
+    protected String processorName;
+    protected TransactionTemplate transactionTemplate;
+    protected DataSource dataSource;
+
+    public AbstractJdbcMessageIdRepository() {
+        super();
+    }
+
+    public AbstractJdbcMessageIdRepository(JdbcTemplate jdbcTemplate, TransactionTemplate transactionTemplate) {
+        this.jdbcTemplate = jdbcTemplate;
+        this.transactionTemplate = transactionTemplate;
+    }
+
+    public AbstractJdbcMessageIdRepository(DataSource dataSource, TransactionTemplate transactionTemplate, String processorName) {
+        this.jdbcTemplate = new JdbcTemplate(dataSource);
+        this.jdbcTemplate.afterPropertiesSet();
+        this.processorName = processorName;
+        this.transactionTemplate = transactionTemplate;
+    }
+
+    public AbstractJdbcMessageIdRepository(DataSource dataSource, String processorName) {
+        this(dataSource, createTransactionTemplate(dataSource), processorName);
+    }
+
+    /**
+     * Operation that returns the number of rows, if any, for the specified key
+     *
+     * @param key  the key
+     * @return int number of rows
+     */
+    protected abstract int queryForInt(final T key);
+
+    /**
+     * Operation that inserts the key if it does not already exist
+     *
+     * @param key  the key
+     * @return int number of rows inserted
+     */
+    protected abstract int insert(final T key);
+
+    /**
+     * Operations that deletes the key if it exists
+     *
+     * @param key  the key
+     * @return int number of rows deleted
+     */
+    protected abstract int delete(final T key);
+
+    /**
+     * Creates the transaction template
+     */
+    protected static TransactionTemplate createTransactionTemplate(DataSource dataSource) {
+        TransactionTemplate transactionTemplate = new TransactionTemplate();
+        transactionTemplate.setTransactionManager(new DataSourceTransactionManager(dataSource));
+        transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
+        return transactionTemplate;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+    }
+
+    @ManagedOperation(description = "Adds the key to the store")
+    @Override
+    public boolean add(final T key) {
+        // Run this in single transaction.
+        Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() {
+            public Boolean doInTransaction(TransactionStatus status) {
+                int count = queryForInt(key);
+                if (count == 0) {
+                    insert(key);
+                    return Boolean.TRUE;
+                } else {
+                    return Boolean.FALSE;
+                }
+            }
+        });
+        return rc.booleanValue();
+    }
+
+    @ManagedOperation(description = "Does the store contain the given key")
+    @Override
+    public boolean contains(final T key) {
+        // Run this in single transaction.
+        Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() {
+            public Boolean doInTransaction(TransactionStatus status) {
+                int count = queryForInt(key);
+                if (count == 0) {
+                    return Boolean.FALSE;
+                } else {
+                    return Boolean.TRUE;
+                }
+            }
+        });
+        return rc.booleanValue();
+
+    }
+
+    @ManagedOperation(description = "Remove the key from the store")
+    @Override
+    public boolean remove(final T key) {
+        Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() {
+            public Boolean doInTransaction(TransactionStatus status) {
+                int updateCount = delete(key);
+                if (updateCount == 0) {
+                    return Boolean.FALSE;
+                } else {
+                    return Boolean.TRUE;
+                }
+            }
+        });
+        return rc.booleanValue();
+    }
+
+    @Override
+    public boolean confirm(final T key) {
+        return true;
+    }
+
+    public JdbcTemplate getJdbcTemplate() {
+        return jdbcTemplate;
+    }
+
+    public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
+        this.jdbcTemplate = jdbcTemplate;
+    }
+
+    public String getProcessorName() {
+        return processorName;
+    }
+
+    public void setProcessorName(String processorName) {
+        this.processorName = processorName;
+    }
+
+    public TransactionTemplate getTransactionTemplate() {
+        return transactionTemplate;
+    }
+
+    public void setTransactionTemplate(TransactionTemplate transactionTemplate) {
+        this.transactionTemplate = transactionTemplate;
+    }
+
+    public DataSource getDataSource() {
+        return dataSource;
+    }
+
+    public void setDataSource(DataSource dataSource) {
+        this.dataSource = dataSource;
+    }
+
+}

Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java?rev=1222582&r1=1222581&r2=1222582&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java (original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java Fri Dec 23 06:11:18 2011
@@ -17,120 +17,49 @@
 package org.apache.camel.processor.idempotent.jdbc;
 
 import java.sql.Timestamp;
-
 import javax.sql.DataSource;
 
-import org.apache.camel.api.management.ManagedAttribute;
-import org.apache.camel.api.management.ManagedOperation;
-import org.apache.camel.api.management.ManagedResource;
-import org.apache.camel.spi.IdempotentRepository;
-import org.apache.camel.support.ServiceSupport;
 import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.jdbc.datasource.DataSourceTransactionManager;
-import org.springframework.transaction.TransactionDefinition;
-import org.springframework.transaction.TransactionStatus;
-import org.springframework.transaction.support.TransactionCallback;
 import org.springframework.transaction.support.TransactionTemplate;
 
 /**
- * @version 
+ * Default implementation of {@link AbstractJdbcMessageIdRepository}
  */
-@ManagedResource(description = "JDBC based message id repository")
-public class JdbcMessageIdRepository extends ServiceSupport implements IdempotentRepository<String> {
-    
-    protected static final String QUERY_STRING = "SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?";
-    protected static final String INSERT_STRING = "INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)";
-    protected static final String DELETE_STRING = "DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?";
-    
-    private final JdbcTemplate jdbcTemplate;
-    private final String processorName;
-    private final TransactionTemplate transactionTemplate;
+public class JdbcMessageIdRepository extends AbstractJdbcMessageIdRepository<String> {
+
+    public static final String QUERY_STRING = "SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?";
+    public static final String INSERT_STRING = "INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)";
+    public static final String DELETE_STRING = "DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?";
+
+    public JdbcMessageIdRepository() {
+        super();
+    }
 
     public JdbcMessageIdRepository(DataSource dataSource, String processorName) {
-        this(dataSource, createTransactionTemplate(dataSource), processorName);
+        super(dataSource, processorName);
     }
 
     public JdbcMessageIdRepository(DataSource dataSource, TransactionTemplate transactionTemplate, String processorName) {
-        this.jdbcTemplate = new JdbcTemplate(dataSource);
-        this.jdbcTemplate.afterPropertiesSet();
-        this.processorName = processorName;
-        this.transactionTemplate = transactionTemplate;
-    }
-
-    public static JdbcMessageIdRepository jpaMessageIdRepository(DataSource dataSource, String processorName) {
-        return new JdbcMessageIdRepository(dataSource, processorName);
-    }
-
-    private static TransactionTemplate createTransactionTemplate(DataSource dataSource) {
-        TransactionTemplate transactionTemplate = new TransactionTemplate();
-        transactionTemplate.setTransactionManager(new DataSourceTransactionManager(dataSource));
-        transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
-        return transactionTemplate;
-    }
-
-    @ManagedOperation(description = "Adds the key to the store")
-    public boolean add(final String messageId) {
-        // Run this in single transaction.
-        Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() {
-            public Boolean doInTransaction(TransactionStatus status) {
-                int count = jdbcTemplate.queryForInt(QUERY_STRING, processorName, messageId);
-                if (count == 0) {
-                    jdbcTemplate.update(INSERT_STRING, processorName, messageId, new Timestamp(System.currentTimeMillis()));
-                    return Boolean.TRUE;
-                } else {
-                    return Boolean.FALSE;
-                }
-            }
-        });
-        return rc.booleanValue();
-    }
-
-    @ManagedOperation(description = "Does the store contain the given key")
-    public boolean contains(final String messageId) {
-        // Run this in single transaction.
-        Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() {
-            public Boolean doInTransaction(TransactionStatus status) {
-                int count = jdbcTemplate.queryForInt(QUERY_STRING, processorName, messageId);
-                if (count == 0) {
-                    return Boolean.FALSE;
-                } else {
-                    return Boolean.TRUE;
-                }
-            }
-        });
-        return rc.booleanValue();
-    }
-
-    @ManagedOperation(description = "Remove the key from the store")
-    public boolean remove(final String messageId) {
-        Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() {
-            public Boolean doInTransaction(TransactionStatus status) {
-                int updateCount = jdbcTemplate.update(DELETE_STRING, processorName, messageId);
-                if (updateCount == 0) {
-                    return Boolean.FALSE;
-                } else {
-                    return Boolean.TRUE;
-                }
-            }
-        });
-        return rc.booleanValue();
-    }
-
-    public boolean confirm(String s) {
-        // noop
-        return true;
-    }
-
-    @ManagedAttribute(description = "The processor name")
-    public String getProcessorName() {
-        return processorName;
+        super(dataSource, transactionTemplate, processorName);
+    }
+
+    public JdbcMessageIdRepository(JdbcTemplate jdbcTemplate, TransactionTemplate transactionTemplate) {
+        super(jdbcTemplate, transactionTemplate);
     }
 
     @Override
-    protected void doStart() throws Exception {
+    protected int queryForInt(String key) {
+        return jdbcTemplate.queryForInt(QUERY_STRING, processorName, key);
     }
 
     @Override
-    protected void doStop() throws Exception {
+    protected int insert(String key) {
+        return jdbcTemplate.update(INSERT_STRING, processorName, key, new Timestamp(System.currentTimeMillis()));
     }
-}
+
+    @Override
+    protected int delete(String key) {
+        return jdbcTemplate.update(DELETE_STRING, processorName, key);
+    }
+
+}
\ No newline at end of file