You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/07/24 14:19:19 UTC
git commit: CAMEL-6144: Added support for optimistick locking in
camel-hawtdb
Updated Branches:
refs/heads/master 12186266c -> 0bee1b227
CAMEL-6144: Added support for optimistick locking in camel-hawtdb
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0bee1b22
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0bee1b22
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0bee1b22
Branch: refs/heads/master
Commit: 0bee1b227206516a705337dbeddddf9e807b439b
Parents: 1218626
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Jul 24 14:17:17 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Jul 24 14:17:17 2013 +0200
----------------------------------------------------------------------
.../hawtdb/HawtDBAggregationRepository.java | 20 +++++++++++++---
.../camel/component/hawtdb/HawtDBFile.java | 24 ++++++++++++++------
2 files changed, 34 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/0bee1b22/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java b/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
index 93b5082..93b3435 100644
--- a/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
+++ b/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
@@ -27,11 +27,13 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
+import org.apache.camel.spi.OptimisticLockingAggregationRepository;
import org.apache.camel.spi.RecoverableAggregationRepository;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtdb.api.OptimisticUpdateException;
import org.fusesource.hawtdb.api.SortedIndex;
import org.fusesource.hawtdb.api.Transaction;
import org.slf4j.Logger;
@@ -40,7 +42,7 @@ import org.slf4j.LoggerFactory;
/**
* An instance of AggregationRepository which is backed by a HawtDB.
*/
-public class HawtDBAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository {
+public class HawtDBAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository, OptimisticLockingAggregationRepository {
private static final transient Logger LOG = LoggerFactory.getLogger(HawtDBAggregationRepository.class);
private HawtDBFile hawtDBFile;
@@ -100,6 +102,18 @@ public class HawtDBAggregationRepository extends ServiceSupport implements Recov
}
public Exchange add(final CamelContext camelContext, final String key, final Exchange exchange) {
+ return doAdd(camelContext, key, exchange, true);
+ }
+
+ public Exchange add(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) throws OptimisticLockingException {
+ try {
+ return doAdd(camelContext, key, newExchange, false);
+ } catch (OptimisticUpdateException e) {
+ throw new OptimisticLockingException();
+ }
+ }
+
+ protected Exchange doAdd(final CamelContext camelContext, final String key, final Exchange exchange, final boolean handleOptimisticLockingException) {
LOG.debug("Adding key [{}] -> {}", key, exchange);
try {
// If we could guarantee that the key and exchange are immutable,
@@ -121,7 +135,7 @@ public class HawtDBAggregationRepository extends ServiceSupport implements Recov
public String toString() {
return "Adding key [" + key + "]";
}
- });
+ }, handleOptimisticLockingException);
if (rc == null) {
return null;
}
@@ -255,7 +269,6 @@ public class HawtDBAggregationRepository extends ServiceSupport implements Recov
}
}
return null;
-
}
@Override
@@ -336,6 +349,7 @@ public class HawtDBAggregationRepository extends ServiceSupport implements Recov
return "Recovering exchangeId [" + exchangeId + "]";
}
});
+
if (rc != null) {
answer = codec.unmarshallExchange(camelContext, rc);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0bee1b22/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java
----------------------------------------------------------------------
diff --git a/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java b/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java
index 4114966..3b0b643 100644
--- a/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java
+++ b/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java
@@ -97,7 +97,7 @@ public class HawtDBFile extends TxPageFileFactory implements Service {
public String toString() {
return "Allocation repository file: " + getFile();
}
- });
+ }, true);
}
public void stop() {
@@ -113,10 +113,14 @@ public class HawtDBFile extends TxPageFileFactory implements Service {
}
public <T> T execute(Work<T> work) {
+ return execute(work, true);
+ }
+
+ public <T> T execute(Work<T> work, boolean rollbackOnOptimisticUpdateException) {
LOG.trace("Executing work +++ start +++ {}", work);
Transaction tx = pageFile.tx();
- T answer = doExecute(work, tx, pageFile);
+ T answer = doExecute(work, tx, pageFile, rollbackOnOptimisticUpdateException);
LOG.trace("Executing work +++ done +++ {}", work);
return answer;
@@ -148,7 +152,7 @@ public class HawtDBFile extends TxPageFileFactory implements Service {
return answer;
}
- private static <T> T doExecute(Work<T> work, Transaction tx, TxPageFile page) {
+ private static <T> T doExecute(Work<T> work, Transaction tx, TxPageFile page, boolean handleOptimisticLockingException) {
T answer = null;
boolean done = false;
@@ -172,10 +176,16 @@ public class HawtDBFile extends TxPageFileFactory implements Service {
// and we are done
done = true;
} catch (OptimisticUpdateException e) {
- // retry as we hit an optimistic update error
- LOG.warn("OptimisticUpdateException occurred at attempt " + attempt + " executing work " + work + ". Will do rollback and retry.");
- // no harm doing rollback before retry and no wait is needed
- tx.rollback();
+ if (handleOptimisticLockingException) {
+ // retry as we hit an optimistic update error
+ LOG.warn("OptimisticUpdateException occurred at attempt " + attempt + " executing work " + work + ". Will do rollback and retry.");
+ // no harm doing rollback before retry and no wait is needed
+ tx.rollback();
+ } else {
+ // we must rollback and rethrow
+ tx.rollback();
+ throw e;
+ }
} catch (RuntimeException e) {
LOG.warn("Error executing work " + work + ". Will do rollback.", e);
tx.rollback();