You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/07/24 13:40:49 UTC

git commit: CAMEL-6144: Added support for optimistick locking in JDBC based aggregation repository.

Updated Branches:
  refs/heads/master df4b8bb3c -> 0be323f0d


CAMEL-6144: Added support for optimistick locking in JDBC based aggregation repository.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0be323f0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0be323f0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0be323f0

Branch: refs/heads/master
Commit: 0be323f0d71a2b6aee00d1b64467008fb0a57a96
Parents: df4b8bb
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Jul 24 13:40:33 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Jul 24 13:40:33 2013 +0200

----------------------------------------------------------------------
 ...ultJdbcOptimisticLockingExceptionMapper.java | 67 ++++++++++++++++++++
 .../jdbc/JdbcAggregationRepository.java         | 27 +++++++-
 .../JdbcOptimisticLockingExceptionMapper.java   | 33 ++++++++++
 .../aggregate/jdbc/JdbcSpringAggregateTest.xml  |  2 +-
 4 files changed, 127 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0be323f0/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/DefaultJdbcOptimisticLockingExceptionMapper.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/DefaultJdbcOptimisticLockingExceptionMapper.java b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/DefaultJdbcOptimisticLockingExceptionMapper.java
new file mode 100644
index 0000000..805397f
--- /dev/null
+++ b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/DefaultJdbcOptimisticLockingExceptionMapper.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.jdbc;
+
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * A default {@link JdbcOptimisticLockingExceptionMapper} which checks the caused exception (and its nested)
+ * whether any of them has <tt>ConstraintViolation</tt> in the class name. If there is such a class name
+ * then {@link #isOptimisticLocking(Exception)} returns <tt>true</tt>.
+ * <p/>
+ * In addition you can add FQN classnames using the {@link #addClassName(String)} or {@link #setClassNames(java.util.Set)}
+ * methods. These class names is also matched. This allows to add vendor specific exception classes.
+ */
+public class DefaultJdbcOptimisticLockingExceptionMapper implements JdbcOptimisticLockingExceptionMapper {
+
+    private final Set<String> classNames = new LinkedHashSet<String>();
+
+    @Override
+    public boolean isOptimisticLocking(Exception cause) {
+        Iterator<Throwable> it = ObjectHelper.createExceptionIterator(cause);
+        while (it.hasNext()) {
+            String name = it.next().getClass().getName();
+            if (name.contains("ConstraintViolation") || hasClassName(name)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private boolean hasClassName(String name) {
+        for (String className : classNames) {
+            if (className.equals(name)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public void addClassName(String name) {
+        classNames.add(name);
+    }
+
+    public void setClassNames(Set<String> names) {
+        classNames.clear();
+        classNames.addAll(names);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0be323f0/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
index 87c6b15..bf8394d 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
@@ -28,6 +28,7 @@ import javax.sql.DataSource;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.spi.OptimisticLockingAggregationRepository;
 import org.apache.camel.spi.RecoverableAggregationRepository;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.ObjectHelper;
@@ -50,12 +51,13 @@ import org.springframework.transaction.support.TransactionTemplate;
 /**
  * JDBC based {@link org.apache.camel.spi.AggregationRepository}
  */
-public class JdbcAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository {
+public class JdbcAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository, OptimisticLockingAggregationRepository {
 
     private static final transient Logger LOG = LoggerFactory.getLogger(JdbcAggregationRepository.class);
     private static final String ID = "id";
     private static final String EXCHANGE = "exchange";
     private static final String BODY = "body";
+    private JdbcOptimisticLockingExceptionMapper jdbcOptimisticLockingExceptionMapper = new DefaultJdbcOptimisticLockingExceptionMapper();
     private PlatformTransactionManager transactionManager;
     private DataSource dataSource;
     private TransactionTemplate transactionTemplate;
@@ -112,6 +114,21 @@ public class JdbcAggregationRepository extends ServiceSupport implements Recover
     }
 
     @Override
+    public Exchange add(final CamelContext camelContext, final String correlationId,
+                        final Exchange oldExchange, final Exchange newExchange) throws OptimisticLockingException {
+
+        try {
+            return add(camelContext, correlationId, newExchange);
+        } catch (Exception e) {
+            if (jdbcOptimisticLockingExceptionMapper != null && jdbcOptimisticLockingExceptionMapper.isOptimisticLocking(e)) {
+                throw new OptimisticLockingException();
+            } else {
+                throw ObjectHelper.wrapRuntimeCamelException(e);
+            }
+        }
+    }
+
+    @Override
     public Exchange add(final CamelContext camelContext, final String correlationId, final Exchange exchange) {
         return transactionTemplate.execute(new TransactionCallback<Exchange>() {
 
@@ -425,6 +442,14 @@ public class JdbcAggregationRepository extends ServiceSupport implements Recover
         this.lobHandler = lobHandler;
     }
 
+    public JdbcOptimisticLockingExceptionMapper getJdbcOptimisticLockingExceptionMapper() {
+        return jdbcOptimisticLockingExceptionMapper;
+    }
+
+    public void setJdbcOptimisticLockingExceptionMapper(JdbcOptimisticLockingExceptionMapper jdbcOptimisticLockingExceptionMapper) {
+        this.jdbcOptimisticLockingExceptionMapper = jdbcOptimisticLockingExceptionMapper;
+    }
+
     public String getRepositoryName() {
         return repositoryName;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/0be323f0/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcOptimisticLockingExceptionMapper.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcOptimisticLockingExceptionMapper.java b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcOptimisticLockingExceptionMapper.java
new file mode 100644
index 0000000..6a57d8f
--- /dev/null
+++ b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcOptimisticLockingExceptionMapper.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.jdbc;
+
+/**
+ * Mapper allowing different JDBC vendors to be mapped with vendor specific error codes
+ * to an {@link JdbcAggregationRepository.OptimisticLockingException}}.
+ */
+public interface JdbcOptimisticLockingExceptionMapper {
+
+    /**
+     * Checks the caused exception whether its to be considered as an {@link JdbcAggregationRepository.OptimisticLockingException}.
+     *
+     * @param cause the caused exception
+     * @return <tt>true</tt> if the caused should be rethrown as an {@link JdbcAggregationRepository.OptimisticLockingException}.
+     */
+    boolean isOptimisticLocking(Exception cause);
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0be323f0/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateTest.xml b/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateTest.xml
index c28f259..8c9457f 100644
--- a/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateTest.xml
+++ b/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateTest.xml
@@ -36,7 +36,7 @@
         <route>
             <from uri="direct:start"/>
             <!-- aggregate using our strategy and jdbc repo, and complete when we have 5 messages aggregated -->
-            <aggregate strategyRef="myAggregatorStrategy" aggregationRepositoryRef="repo1" completionSize="5">
+            <aggregate strategyRef="myAggregatorStrategy" aggregationRepositoryRef="repo1" completionSize="5" optimisticLocking="true">
                 <!-- correlate by header with the key id -->
                 <correlationExpression><header>id</header></correlationExpression>
                 <!-- send aggregated messages to the mock endpoint -->