You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/12/23 07:11:19 UTC
svn commit: r1222582 - in
/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc:
AbstractJdbcMessageIdRepository.java JdbcMessageIdRepository.java
Author: davsclaus
Date: Fri Dec 23 06:11:18 2011
New Revision: 1222582
URL: http://svn.apache.org/viewvc?rev=1222582&view=rev
Log:
CAMEL-4822: Introduced AbstractJdbcMessageIdRepository to make it easier to implement custom JDBC repo, to control the SQL being used. Thanks to Philip Glebow for the patch.
Added:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/AbstractJdbcMessageIdRepository.java
Modified:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java
Added: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/AbstractJdbcMessageIdRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/AbstractJdbcMessageIdRepository.java?rev=1222582&view=auto
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/AbstractJdbcMessageIdRepository.java (added)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/AbstractJdbcMessageIdRepository.java Fri Dec 23 06:11:18 2011
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent.jdbc;
+
+import javax.sql.DataSource;
+
+import org.apache.camel.spi.IdempotentRepository;
+import org.apache.camel.support.ServiceSupport;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.jmx.export.annotation.ManagedOperation;
+import org.springframework.jmx.export.annotation.ManagedResource;
+import org.springframework.transaction.TransactionDefinition;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionCallback;
+import org.springframework.transaction.support.TransactionTemplate;
+
+/**
+ * Base class for JDBC-based idempotent repositories that allows the schema to be changed.
+ * <p/>
+ * Subclasses need only implement theses methods:
+ * <ul>
+ * <li>{@link #queryForInt(T key)}</li>
+ * <li>{@link #insert(T key)}</li>
+ * <li>{@link #delete(T key)}</li>
+ * </ul>
+ * <p/>
+ * These methods should perform the named database operation.
+ */
+@ManagedResource("JDBC IdempotentRepository")
+public abstract class AbstractJdbcMessageIdRepository<T> extends ServiceSupport implements IdempotentRepository<T> {
+
+ protected JdbcTemplate jdbcTemplate;
+ protected String processorName;
+ protected TransactionTemplate transactionTemplate;
+ protected DataSource dataSource;
+
+ public AbstractJdbcMessageIdRepository() {
+ super();
+ }
+
+ public AbstractJdbcMessageIdRepository(JdbcTemplate jdbcTemplate, TransactionTemplate transactionTemplate) {
+ this.jdbcTemplate = jdbcTemplate;
+ this.transactionTemplate = transactionTemplate;
+ }
+
+ public AbstractJdbcMessageIdRepository(DataSource dataSource, TransactionTemplate transactionTemplate, String processorName) {
+ this.jdbcTemplate = new JdbcTemplate(dataSource);
+ this.jdbcTemplate.afterPropertiesSet();
+ this.processorName = processorName;
+ this.transactionTemplate = transactionTemplate;
+ }
+
+ public AbstractJdbcMessageIdRepository(DataSource dataSource, String processorName) {
+ this(dataSource, createTransactionTemplate(dataSource), processorName);
+ }
+
+ /**
+ * Operation that returns the number of rows, if any, for the specified key
+ *
+ * @param key the key
+ * @return int number of rows
+ */
+ protected abstract int queryForInt(final T key);
+
+ /**
+ * Operation that inserts the key if it does not already exist
+ *
+ * @param key the key
+ * @return int number of rows inserted
+ */
+ protected abstract int insert(final T key);
+
+ /**
+ * Operations that deletes the key if it exists
+ *
+ * @param key the key
+ * @return int number of rows deleted
+ */
+ protected abstract int delete(final T key);
+
+ /**
+ * Creates the transaction template
+ */
+ protected static TransactionTemplate createTransactionTemplate(DataSource dataSource) {
+ TransactionTemplate transactionTemplate = new TransactionTemplate();
+ transactionTemplate.setTransactionManager(new DataSourceTransactionManager(dataSource));
+ transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
+ return transactionTemplate;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ }
+
+ @ManagedOperation(description = "Adds the key to the store")
+ @Override
+ public boolean add(final T key) {
+ // Run this in single transaction.
+ Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() {
+ public Boolean doInTransaction(TransactionStatus status) {
+ int count = queryForInt(key);
+ if (count == 0) {
+ insert(key);
+ return Boolean.TRUE;
+ } else {
+ return Boolean.FALSE;
+ }
+ }
+ });
+ return rc.booleanValue();
+ }
+
+ @ManagedOperation(description = "Does the store contain the given key")
+ @Override
+ public boolean contains(final T key) {
+ // Run this in single transaction.
+ Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() {
+ public Boolean doInTransaction(TransactionStatus status) {
+ int count = queryForInt(key);
+ if (count == 0) {
+ return Boolean.FALSE;
+ } else {
+ return Boolean.TRUE;
+ }
+ }
+ });
+ return rc.booleanValue();
+
+ }
+
+ @ManagedOperation(description = "Remove the key from the store")
+ @Override
+ public boolean remove(final T key) {
+ Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() {
+ public Boolean doInTransaction(TransactionStatus status) {
+ int updateCount = delete(key);
+ if (updateCount == 0) {
+ return Boolean.FALSE;
+ } else {
+ return Boolean.TRUE;
+ }
+ }
+ });
+ return rc.booleanValue();
+ }
+
+ @Override
+ public boolean confirm(final T key) {
+ return true;
+ }
+
+ public JdbcTemplate getJdbcTemplate() {
+ return jdbcTemplate;
+ }
+
+ public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
+ this.jdbcTemplate = jdbcTemplate;
+ }
+
+ public String getProcessorName() {
+ return processorName;
+ }
+
+ public void setProcessorName(String processorName) {
+ this.processorName = processorName;
+ }
+
+ public TransactionTemplate getTransactionTemplate() {
+ return transactionTemplate;
+ }
+
+ public void setTransactionTemplate(TransactionTemplate transactionTemplate) {
+ this.transactionTemplate = transactionTemplate;
+ }
+
+ public DataSource getDataSource() {
+ return dataSource;
+ }
+
+ public void setDataSource(DataSource dataSource) {
+ this.dataSource = dataSource;
+ }
+
+}
Modified: camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java?rev=1222582&r1=1222581&r2=1222582&view=diff
==============================================================================
--- camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java (original)
+++ camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java Fri Dec 23 06:11:18 2011
@@ -17,120 +17,49 @@
package org.apache.camel.processor.idempotent.jdbc;
import java.sql.Timestamp;
-
import javax.sql.DataSource;
-import org.apache.camel.api.management.ManagedAttribute;
-import org.apache.camel.api.management.ManagedOperation;
-import org.apache.camel.api.management.ManagedResource;
-import org.apache.camel.spi.IdempotentRepository;
-import org.apache.camel.support.ServiceSupport;
import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.jdbc.datasource.DataSourceTransactionManager;
-import org.springframework.transaction.TransactionDefinition;
-import org.springframework.transaction.TransactionStatus;
-import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
/**
- * @version
+ * Default implementation of {@link AbstractJdbcMessageIdRepository}
*/
-@ManagedResource(description = "JDBC based message id repository")
-public class JdbcMessageIdRepository extends ServiceSupport implements IdempotentRepository<String> {
-
- protected static final String QUERY_STRING = "SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?";
- protected static final String INSERT_STRING = "INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)";
- protected static final String DELETE_STRING = "DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?";
-
- private final JdbcTemplate jdbcTemplate;
- private final String processorName;
- private final TransactionTemplate transactionTemplate;
+public class JdbcMessageIdRepository extends AbstractJdbcMessageIdRepository<String> {
+
+ public static final String QUERY_STRING = "SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?";
+ public static final String INSERT_STRING = "INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?)";
+ public static final String DELETE_STRING = "DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ?";
+
+ public JdbcMessageIdRepository() {
+ super();
+ }
public JdbcMessageIdRepository(DataSource dataSource, String processorName) {
- this(dataSource, createTransactionTemplate(dataSource), processorName);
+ super(dataSource, processorName);
}
public JdbcMessageIdRepository(DataSource dataSource, TransactionTemplate transactionTemplate, String processorName) {
- this.jdbcTemplate = new JdbcTemplate(dataSource);
- this.jdbcTemplate.afterPropertiesSet();
- this.processorName = processorName;
- this.transactionTemplate = transactionTemplate;
- }
-
- public static JdbcMessageIdRepository jpaMessageIdRepository(DataSource dataSource, String processorName) {
- return new JdbcMessageIdRepository(dataSource, processorName);
- }
-
- private static TransactionTemplate createTransactionTemplate(DataSource dataSource) {
- TransactionTemplate transactionTemplate = new TransactionTemplate();
- transactionTemplate.setTransactionManager(new DataSourceTransactionManager(dataSource));
- transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
- return transactionTemplate;
- }
-
- @ManagedOperation(description = "Adds the key to the store")
- public boolean add(final String messageId) {
- // Run this in single transaction.
- Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() {
- public Boolean doInTransaction(TransactionStatus status) {
- int count = jdbcTemplate.queryForInt(QUERY_STRING, processorName, messageId);
- if (count == 0) {
- jdbcTemplate.update(INSERT_STRING, processorName, messageId, new Timestamp(System.currentTimeMillis()));
- return Boolean.TRUE;
- } else {
- return Boolean.FALSE;
- }
- }
- });
- return rc.booleanValue();
- }
-
- @ManagedOperation(description = "Does the store contain the given key")
- public boolean contains(final String messageId) {
- // Run this in single transaction.
- Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() {
- public Boolean doInTransaction(TransactionStatus status) {
- int count = jdbcTemplate.queryForInt(QUERY_STRING, processorName, messageId);
- if (count == 0) {
- return Boolean.FALSE;
- } else {
- return Boolean.TRUE;
- }
- }
- });
- return rc.booleanValue();
- }
-
- @ManagedOperation(description = "Remove the key from the store")
- public boolean remove(final String messageId) {
- Boolean rc = transactionTemplate.execute(new TransactionCallback<Boolean>() {
- public Boolean doInTransaction(TransactionStatus status) {
- int updateCount = jdbcTemplate.update(DELETE_STRING, processorName, messageId);
- if (updateCount == 0) {
- return Boolean.FALSE;
- } else {
- return Boolean.TRUE;
- }
- }
- });
- return rc.booleanValue();
- }
-
- public boolean confirm(String s) {
- // noop
- return true;
- }
-
- @ManagedAttribute(description = "The processor name")
- public String getProcessorName() {
- return processorName;
+ super(dataSource, transactionTemplate, processorName);
+ }
+
+ public JdbcMessageIdRepository(JdbcTemplate jdbcTemplate, TransactionTemplate transactionTemplate) {
+ super(jdbcTemplate, transactionTemplate);
}
@Override
- protected void doStart() throws Exception {
+ protected int queryForInt(String key) {
+ return jdbcTemplate.queryForInt(QUERY_STRING, processorName, key);
}
@Override
- protected void doStop() throws Exception {
+ protected int insert(String key) {
+ return jdbcTemplate.update(INSERT_STRING, processorName, key, new Timestamp(System.currentTimeMillis()));
}
-}
+
+ @Override
+ protected int delete(String key) {
+ return jdbcTemplate.update(DELETE_STRING, processorName, key);
+ }
+
+}
\ No newline at end of file