You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/07/24 13:40:49 UTC
git commit: CAMEL-6144: Added support for optimistick locking in JDBC
based aggregation repository.
Updated Branches:
refs/heads/master df4b8bb3c -> 0be323f0d
CAMEL-6144: Added support for optimistick locking in JDBC based aggregation repository.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0be323f0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0be323f0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0be323f0
Branch: refs/heads/master
Commit: 0be323f0d71a2b6aee00d1b64467008fb0a57a96
Parents: df4b8bb
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Jul 24 13:40:33 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Jul 24 13:40:33 2013 +0200
----------------------------------------------------------------------
...ultJdbcOptimisticLockingExceptionMapper.java | 67 ++++++++++++++++++++
.../jdbc/JdbcAggregationRepository.java | 27 +++++++-
.../JdbcOptimisticLockingExceptionMapper.java | 33 ++++++++++
.../aggregate/jdbc/JdbcSpringAggregateTest.xml | 2 +-
4 files changed, 127 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/0be323f0/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/DefaultJdbcOptimisticLockingExceptionMapper.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/DefaultJdbcOptimisticLockingExceptionMapper.java b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/DefaultJdbcOptimisticLockingExceptionMapper.java
new file mode 100644
index 0000000..805397f
--- /dev/null
+++ b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/DefaultJdbcOptimisticLockingExceptionMapper.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.jdbc;
+
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * A default {@link JdbcOptimisticLockingExceptionMapper} which checks the caused exception (and its nested)
+ * whether any of them has <tt>ConstraintViolation</tt> in the class name. If there is such a class name
+ * then {@link #isOptimisticLocking(Exception)} returns <tt>true</tt>.
+ * <p/>
+ * In addition you can add FQN classnames using the {@link #addClassName(String)} or {@link #setClassNames(java.util.Set)}
+ * methods. These class names is also matched. This allows to add vendor specific exception classes.
+ */
+public class DefaultJdbcOptimisticLockingExceptionMapper implements JdbcOptimisticLockingExceptionMapper {
+
+ private final Set<String> classNames = new LinkedHashSet<String>();
+
+ @Override
+ public boolean isOptimisticLocking(Exception cause) {
+ Iterator<Throwable> it = ObjectHelper.createExceptionIterator(cause);
+ while (it.hasNext()) {
+ String name = it.next().getClass().getName();
+ if (name.contains("ConstraintViolation") || hasClassName(name)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private boolean hasClassName(String name) {
+ for (String className : classNames) {
+ if (className.equals(name)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public void addClassName(String name) {
+ classNames.add(name);
+ }
+
+ public void setClassNames(Set<String> names) {
+ classNames.clear();
+ classNames.addAll(names);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/0be323f0/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
index 87c6b15..bf8394d 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
@@ -28,6 +28,7 @@ import javax.sql.DataSource;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
+import org.apache.camel.spi.OptimisticLockingAggregationRepository;
import org.apache.camel.spi.RecoverableAggregationRepository;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
@@ -50,12 +51,13 @@ import org.springframework.transaction.support.TransactionTemplate;
/**
* JDBC based {@link org.apache.camel.spi.AggregationRepository}
*/
-public class JdbcAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository {
+public class JdbcAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository, OptimisticLockingAggregationRepository {
private static final transient Logger LOG = LoggerFactory.getLogger(JdbcAggregationRepository.class);
private static final String ID = "id";
private static final String EXCHANGE = "exchange";
private static final String BODY = "body";
+ private JdbcOptimisticLockingExceptionMapper jdbcOptimisticLockingExceptionMapper = new DefaultJdbcOptimisticLockingExceptionMapper();
private PlatformTransactionManager transactionManager;
private DataSource dataSource;
private TransactionTemplate transactionTemplate;
@@ -112,6 +114,21 @@ public class JdbcAggregationRepository extends ServiceSupport implements Recover
}
@Override
+ public Exchange add(final CamelContext camelContext, final String correlationId,
+ final Exchange oldExchange, final Exchange newExchange) throws OptimisticLockingException {
+
+ try {
+ return add(camelContext, correlationId, newExchange);
+ } catch (Exception e) {
+ if (jdbcOptimisticLockingExceptionMapper != null && jdbcOptimisticLockingExceptionMapper.isOptimisticLocking(e)) {
+ throw new OptimisticLockingException();
+ } else {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ }
+ }
+
+ @Override
public Exchange add(final CamelContext camelContext, final String correlationId, final Exchange exchange) {
return transactionTemplate.execute(new TransactionCallback<Exchange>() {
@@ -425,6 +442,14 @@ public class JdbcAggregationRepository extends ServiceSupport implements Recover
this.lobHandler = lobHandler;
}
+ public JdbcOptimisticLockingExceptionMapper getJdbcOptimisticLockingExceptionMapper() {
+ return jdbcOptimisticLockingExceptionMapper;
+ }
+
+ public void setJdbcOptimisticLockingExceptionMapper(JdbcOptimisticLockingExceptionMapper jdbcOptimisticLockingExceptionMapper) {
+ this.jdbcOptimisticLockingExceptionMapper = jdbcOptimisticLockingExceptionMapper;
+ }
+
public String getRepositoryName() {
return repositoryName;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0be323f0/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcOptimisticLockingExceptionMapper.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcOptimisticLockingExceptionMapper.java b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcOptimisticLockingExceptionMapper.java
new file mode 100644
index 0000000..6a57d8f
--- /dev/null
+++ b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcOptimisticLockingExceptionMapper.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.jdbc;
+
+/**
+ * Mapper allowing different JDBC vendors to be mapped with vendor specific error codes
+ * to an {@link JdbcAggregationRepository.OptimisticLockingException}}.
+ */
+public interface JdbcOptimisticLockingExceptionMapper {
+
+ /**
+ * Checks the caused exception whether its to be considered as an {@link JdbcAggregationRepository.OptimisticLockingException}.
+ *
+ * @param cause the caused exception
+ * @return <tt>true</tt> if the caused should be rethrown as an {@link JdbcAggregationRepository.OptimisticLockingException}.
+ */
+ boolean isOptimisticLocking(Exception cause);
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/0be323f0/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateTest.xml b/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateTest.xml
index c28f259..8c9457f 100644
--- a/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateTest.xml
+++ b/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringAggregateTest.xml
@@ -36,7 +36,7 @@
<route>
<from uri="direct:start"/>
<!-- aggregate using our strategy and jdbc repo, and complete when we have 5 messages aggregated -->
- <aggregate strategyRef="myAggregatorStrategy" aggregationRepositoryRef="repo1" completionSize="5">
+ <aggregate strategyRef="myAggregatorStrategy" aggregationRepositoryRef="repo1" completionSize="5" optimisticLocking="true">
<!-- correlate by header with the key id -->
<correlationExpression><header>id</header></correlationExpression>
<!-- send aggregated messages to the mock endpoint -->