You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/07/24 14:19:19 UTC

git commit: CAMEL-6144: Added support for optimistick locking in camel-hawtdb

Updated Branches:
  refs/heads/master 12186266c -> 0bee1b227


CAMEL-6144: Added support for optimistick locking in camel-hawtdb


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0bee1b22
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0bee1b22
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0bee1b22

Branch: refs/heads/master
Commit: 0bee1b227206516a705337dbeddddf9e807b439b
Parents: 1218626
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Jul 24 14:17:17 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Jul 24 14:17:17 2013 +0200

----------------------------------------------------------------------
 .../hawtdb/HawtDBAggregationRepository.java     | 20 +++++++++++++---
 .../camel/component/hawtdb/HawtDBFile.java      | 24 ++++++++++++++------
 2 files changed, 34 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0bee1b22/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java b/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
index 93b5082..93b3435 100644
--- a/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
+++ b/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
@@ -27,11 +27,13 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.spi.OptimisticLockingAggregationRepository;
 import org.apache.camel.spi.RecoverableAggregationRepository;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
 import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtdb.api.OptimisticUpdateException;
 import org.fusesource.hawtdb.api.SortedIndex;
 import org.fusesource.hawtdb.api.Transaction;
 import org.slf4j.Logger;
@@ -40,7 +42,7 @@ import org.slf4j.LoggerFactory;
 /**
  * An instance of AggregationRepository which is backed by a HawtDB.
  */
-public class HawtDBAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository {
+public class HawtDBAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository, OptimisticLockingAggregationRepository {
 
     private static final transient Logger LOG = LoggerFactory.getLogger(HawtDBAggregationRepository.class);
     private HawtDBFile hawtDBFile;
@@ -100,6 +102,18 @@ public class HawtDBAggregationRepository extends ServiceSupport implements Recov
     }
 
     public Exchange add(final CamelContext camelContext, final String key, final Exchange exchange) {
+        return doAdd(camelContext, key, exchange, true);
+    }
+
+    public Exchange add(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) throws OptimisticLockingException {
+        try {
+            return doAdd(camelContext, key, newExchange, false);
+        } catch (OptimisticUpdateException e) {
+            throw new OptimisticLockingException();
+        }
+    }
+
+    protected Exchange doAdd(final CamelContext camelContext, final String key, final Exchange exchange, final boolean handleOptimisticLockingException) {
         LOG.debug("Adding key [{}] -> {}", key, exchange);
         try {
             // If we could guarantee that the key and exchange are immutable,
@@ -121,7 +135,7 @@ public class HawtDBAggregationRepository extends ServiceSupport implements Recov
                 public String toString() {
                     return "Adding key [" + key + "]";
                 }
-            });
+            }, handleOptimisticLockingException);
             if (rc == null) {
                 return null;
             }
@@ -255,7 +269,6 @@ public class HawtDBAggregationRepository extends ServiceSupport implements Recov
                     }
                 }
                 return null;
-
             }
 
             @Override
@@ -336,6 +349,7 @@ public class HawtDBAggregationRepository extends ServiceSupport implements Recov
                     return "Recovering exchangeId [" + exchangeId + "]";
                 }
             });
+
             if (rc != null) {
                 answer = codec.unmarshallExchange(camelContext, rc);
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/0bee1b22/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java
----------------------------------------------------------------------
diff --git a/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java b/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java
index 4114966..3b0b643 100644
--- a/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java
+++ b/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBFile.java
@@ -97,7 +97,7 @@ public class HawtDBFile extends TxPageFileFactory implements Service {
             public String toString() {
                 return "Allocation repository file: " + getFile();
             }
-        });
+        }, true);
     }
 
     public void stop() {
@@ -113,10 +113,14 @@ public class HawtDBFile extends TxPageFileFactory implements Service {
     }
 
     public <T> T execute(Work<T> work) {
+        return execute(work, true);
+    }
+
+    public <T> T execute(Work<T> work, boolean rollbackOnOptimisticUpdateException) {
         LOG.trace("Executing work +++ start +++ {}", work);
 
         Transaction tx = pageFile.tx();
-        T answer = doExecute(work, tx, pageFile);
+        T answer = doExecute(work, tx, pageFile, rollbackOnOptimisticUpdateException);
 
         LOG.trace("Executing work +++ done  +++ {}", work);
         return answer;
@@ -148,7 +152,7 @@ public class HawtDBFile extends TxPageFileFactory implements Service {
         return answer;
     }
 
-    private static <T> T doExecute(Work<T> work, Transaction tx, TxPageFile page) {
+    private static <T> T doExecute(Work<T> work, Transaction tx, TxPageFile page, boolean handleOptimisticLockingException) {
         T answer = null;
 
         boolean done = false;
@@ -172,10 +176,16 @@ public class HawtDBFile extends TxPageFileFactory implements Service {
                 // and we are done
                 done = true;
             } catch (OptimisticUpdateException e) {
-                // retry as we hit an optimistic update error
-                LOG.warn("OptimisticUpdateException occurred at attempt " + attempt + " executing work " + work + ". Will do rollback and retry.");
-                // no harm doing rollback before retry and no wait is needed
-                tx.rollback();
+                if (handleOptimisticLockingException) {
+                    // retry as we hit an optimistic update error
+                    LOG.warn("OptimisticUpdateException occurred at attempt " + attempt + " executing work " + work + ". Will do rollback and retry.");
+                    // no harm doing rollback before retry and no wait is needed
+                    tx.rollback();
+                } else {
+                    // we must rollback and rethrow
+                    tx.rollback();
+                    throw e;
+                }
             } catch (RuntimeException e) {
                 LOG.warn("Error executing work " + work + ". Will do rollback.", e);
                 tx.rollback();