You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/04/16 10:16:26 UTC

[2/4] git commit: CAMEL-7118 Add Hazelcast-based Recoverable Aggregation Repository with thanks to Alexander

CAMEL-7118 Add Hazelcast-based Recoverable Aggregation Repository with thanks to Alexander


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b45fff0a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b45fff0a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b45fff0a

Branch: refs/heads/master
Commit: b45fff0a36c66afcc97d5d8e3e600ba82a3cf8aa
Parents: 529f5da
Author: Willem Jiang <wi...@gmail.com>
Authored: Wed Apr 16 14:53:57 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Wed Apr 16 16:09:32 2014 +0800

----------------------------------------------------------------------
 components/camel-hazelcast/pom.xml              |   3 +-
 .../HazelcastAggregationRepository.java         | 403 +++++++++++++++++++
 ...stAggregationRepositoryCamelTestSupport.java |  45 +++
 ...stAggregationRepositoryConstructorsTest.java |  65 +++
 ...castAggregationRepositoryOperationsTest.java | 155 +++++++
 ...regationRepositoryRecoverableRoutesTest.java | 133 ++++++
 ...azelcastAggregationRepositoryRoutesTest.java |  84 ++++
 .../hazelcast/SumOfIntsAggregationStrategy.java |  24 ++
 8 files changed, 911 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b45fff0a/components/camel-hazelcast/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/pom.xml b/components/camel-hazelcast/pom.xml
index 7e04e90..3bdf208 100644
--- a/components/camel-hazelcast/pom.xml
+++ b/components/camel-hazelcast/pom.xml
@@ -35,7 +35,8 @@
       </camel.osgi.import.before.defaults>
       <camel.osgi.export.pkg>
             org.apache.camel.component.hazelcast.*;${camel.osgi.version},
-            org.apache.camel.processor.idempotent.hazelcast.*
+            org.apache.camel.processor.idempotent.hazelcast.*,
+            org.apache.camel.processor.aggregate.hazelcast.*
 	    </camel.osgi.export.pkg>
       <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=hazelcast</camel.osgi.export.service>
     </properties>

http://git-wip-us.apache.org/repos/asf/camel/blob/b45fff0a/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java b/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
new file mode 100644
index 0000000..3b493ca
--- /dev/null
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
@@ -0,0 +1,403 @@
+package org.apache.camel.processor.aggregate.hazelcast;
+
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.XmlConfigBuilder;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+import com.hazelcast.core.TransactionalMap;
+import com.hazelcast.transaction.TransactionContext;
+import com.hazelcast.transaction.TransactionOptions;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultExchangeHolder;
+import org.apache.camel.spi.OptimisticLockingAggregationRepository;
+import org.apache.camel.spi.RecoverableAggregationRepository;
+import org.apache.camel.support.ServiceSupport;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Hazelcast-based AggregationRepository implementing
+ * {@link RecoverableAggregationRepository} and {@link OptimisticLockingAggregationRepository}.
+ * Defaults to thread-safe (non-optimistic) locking and recoverable strategy.
+ * Hazelcast settings are given to an end-user and can be controlled with repositoryName and persistentRespositoryName,
+ * both are {@link com.hazelcast.core.IMap} &lt;String, Exchange&gt;. However HazelcastAggregationRepository
+ * can run it's own Hazelcast instance, but obviously no benefits of Hazelcast clustering are gained this way.
+ * If the {@link HazelcastAggregationRepository} uses it's own local {@link HazelcastInstance} it will destroy this
+ * instance on {@link #doStop()}. You should control {@link HazelcastInstance} lifecycle yourself whenever you instantiate
+ * {@link HazelcastAggregationRepository} passing a reference to the instance.
+ *
+ */
+public final class HazelcastAggregationRepository extends ServiceSupport
+                                                  implements RecoverableAggregationRepository,
+                                                             OptimisticLockingAggregationRepository {
+    private boolean optimistic;
+    private boolean useLocalHzInstance;
+    private boolean useRecovery = true;
+    private IMap<String, DefaultExchangeHolder> cache;
+    private IMap<String, DefaultExchangeHolder> persistedCache;
+    private static final Logger LOG = LoggerFactory.getLogger(HazelcastAggregationRepository.class.getName()) ;
+    private HazelcastInstance hzInstance;
+    private String mapName;
+    private String persistenceMapName;
+    private static final String COMPLETED_SUFFIX = "-completed";
+    private String deadLetterChannel;
+    private long recoveryInterval = 5000;
+    private int maximumRedeliveries = 3;
+
+    /**
+     * Creates new {@link HazelcastAggregationRepository} that defaults to non-optimistic locking
+     * with recoverable behavior and a local Hazelcast instance. Recoverable repository name defaults to
+     * {@code repositoryName} + "-compeleted".
+     * @param repositoryName {@link IMap} repository name;
+     */
+    public HazelcastAggregationRepository(final String repositoryName) {
+        mapName = repositoryName;
+        persistenceMapName = String.format("%s%s", mapName, COMPLETED_SUFFIX);
+        optimistic = false;
+        useLocalHzInstance = true;
+    }
+
+    /**
+    * Creates new {@link HazelcastAggregationRepository} that defaults to non-optimistic locking
+    * with recoverable behavior and a local Hazelcast instance.
+    * @param repositoryName {@link IMap} repository name;
+    * @param  persistentRepositoryName {@link IMap} recoverable repository name;
+    */
+    public HazelcastAggregationRepository(final String repositoryName, final String persistentRepositoryName) {
+        mapName = repositoryName;
+        persistenceMapName = persistentRepositoryName;
+        optimistic = false;
+        useLocalHzInstance = true;
+    }
+
+    /**
+     * Creates new {@link HazelcastAggregationRepository} with recoverable behavior and a local Hazelcast instance.
+     * Recoverable repository name defaults to {@code repositoryName} + "-compeleted".
+     * @param repositoryName {@link IMap} repository name;
+     * @param  optimistic whether to use optimistic locking manner.
+     */
+    public HazelcastAggregationRepository(final String repositoryName, boolean optimistic) {
+        this(repositoryName);
+        this.optimistic = optimistic;
+        useLocalHzInstance = true;
+    }
+
+    /**
+     * Creates new {@link HazelcastAggregationRepository} with recoverable behavior and a local Hazelcast instance.
+     * @param repositoryName {@link IMap} repository name;
+     * @param  persistentRepositoryName {@link IMap} recoverable repository name;
+     * @param optimistic whether to use optimistic locking manner.
+     */
+    public HazelcastAggregationRepository(final String repositoryName, final String persistentRepositoryName, boolean optimistic) {
+        this(repositoryName, persistentRepositoryName);
+        this.optimistic = optimistic;
+        useLocalHzInstance = true;
+    }
+
+    /**
+     * Creates new {@link HazelcastAggregationRepository} that defaults to non-optimistic locking
+     * with recoverable behavior. Recoverable repository name defaults to
+     * {@code repositoryName} + "-compeleted".
+     * @param repositoryName {@link IMap} repository name;
+     * @param hzInstanse externally configured {@link HazelcastInstance}.
+     */
+    public HazelcastAggregationRepository(final String repositoryName, HazelcastInstance hzInstanse) {
+        this (repositoryName, false);
+        this.hzInstance = hzInstanse;
+        useLocalHzInstance = false;
+    }
+
+    /**
+     * Creates new {@link HazelcastAggregationRepository} that defaults to non-optimistic locking
+     * with recoverable behavior.
+     * @param repositoryName {@link IMap} repository name;
+     * @param  persistentRepositoryName {@link IMap} recoverable repository name;
+     * @param hzInstanse externally configured {@link HazelcastInstance}.
+     */
+    public HazelcastAggregationRepository(final String repositoryName, final String persistentRepositoryName, HazelcastInstance hzInstanse) {
+        this (repositoryName, persistentRepositoryName, false);
+        this.hzInstance = hzInstanse;
+        useLocalHzInstance = false;
+    }
+
+    /**
+     * Creates new {@link HazelcastAggregationRepository} with recoverable behavior.
+     * Recoverable repository name defaults to {@code repositoryName} + "-compeleted".
+     * @param repositoryName {@link IMap} repository name;
+     * @param  optimistic whether to use optimistic locking manner;
+     * @param hzInstance  externally configured {@link HazelcastInstance}.
+     */
+    public HazelcastAggregationRepository(final String repositoryName, boolean optimistic, HazelcastInstance hzInstance) {
+        this(repositoryName, optimistic);
+        this.hzInstance = hzInstance;
+        useLocalHzInstance = false;
+    }
+
+    /**
+     * Creates new {@link HazelcastAggregationRepository} with recoverable behavior.
+     * @param repositoryName {@link IMap} repository name;
+     * @param optimistic whether to use optimistic locking manner;
+     * @param persistentRepositoryName {@link IMap} recoverable repository name;
+     * @param hzInstance  externally configured {@link HazelcastInstance}.
+     */
+    public HazelcastAggregationRepository(final String repositoryName, final String persistentRepositoryName, boolean optimistic, HazelcastInstance hzInstance) {
+        this(repositoryName, persistentRepositoryName, optimistic);
+        this.hzInstance = hzInstance;
+        useLocalHzInstance = false;
+    }
+
+    @Override
+    public Exchange add(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) throws OptimisticLockingException {
+        if (!optimistic) {
+            throw new UnsupportedOperationException();
+        }
+        LOG.trace("Adding an Exchange with ID {} for key {} in an optimistic manner.", newExchange.getExchangeId(), key);
+        if (oldExchange == null) {
+            DefaultExchangeHolder holder = DefaultExchangeHolder.marshal(newExchange);
+            final DefaultExchangeHolder misbehaviorHolder = cache.putIfAbsent(key, holder);
+            if (misbehaviorHolder != null) {
+                Exchange misbehaviorEx = unmarshallExchange(camelContext, misbehaviorHolder);
+                LOG.error("Optimistic locking failed for exchange with key {}: IMap#putIfAbsend returned Exchange with ID {}, while it's expected no exchanges to be returned",
+                        key, misbehaviorEx != null ? misbehaviorEx.getExchangeId() : "<null>");
+                throw  new OptimisticLockingException();
+            }
+        } else {
+            DefaultExchangeHolder oldHolder = DefaultExchangeHolder.marshal(oldExchange);
+            DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(newExchange);
+            if (!cache.replace(key, oldHolder, newHolder)) {
+                LOG.error("Optimistic locking failed for exchange with key {}: IMap#replace returned no Exchanges, while it's expected to replace one",
+                        key);
+                throw new OptimisticLockingException();
+            }
+        }
+        LOG.trace("Added an Exchange with ID {} for key {} in optimistic manner.", newExchange.getExchangeId(), key);
+        return oldExchange;
+    }
+
+    @Override
+    public Exchange add(CamelContext camelContext, String key, Exchange exchange) {
+        if (optimistic){
+            throw new UnsupportedOperationException();
+        }
+        LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key);
+        Lock l = hzInstance.getLock(mapName);
+        try {
+            l.lock();
+            DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(exchange);
+            DefaultExchangeHolder oldHolder = cache.put(key, newHolder);
+            return unmarshallExchange(camelContext, oldHolder);
+        } finally {
+            LOG.trace("Added an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key);
+            l.unlock();
+        }
+    }
+
+    @Override
+    public Set<String> scan(CamelContext camelContext) {
+        if (useRecovery) {
+            LOG.trace("Scanning for exchanges to recover in {} context", camelContext.getName());
+            Set<String> scanned = Collections.unmodifiableSet(persistedCache.keySet());
+            LOG.trace("Found {} keys for exchanges to recover in {} context", scanned.size(),camelContext.getName());
+            return scanned;
+        }
+        else {
+            LOG.warn("What for to run recovery scans in {} context while repository {} is running in non-recoverable aggregation repository mode?!",
+                    camelContext.getName(), mapName);
+            return Collections.emptySet();
+        }
+    }
+
+    @Override
+    public Exchange recover(CamelContext camelContext, String exchangeId) {
+        LOG.trace("Recovering an Exchange with ID {}.", exchangeId);
+        return useRecovery ? unmarshallExchange(camelContext, persistedCache.get(exchangeId)) : null;
+    }
+
+    @Override
+    public void setRecoveryInterval(long interval, TimeUnit timeUnit) {
+        this.recoveryInterval = timeUnit.toMillis(interval);
+    }
+
+    @Override
+    public void setRecoveryInterval(long interval) {
+        this.recoveryInterval = interval;
+    }
+
+    @Override
+    public long getRecoveryIntervalInMillis() {
+        return recoveryInterval;
+    }
+
+    @Override
+    public void setUseRecovery(boolean useRecovery) {
+        this.useRecovery = useRecovery;
+    }
+
+    @Override
+    public boolean isUseRecovery() {
+        return useRecovery;
+    }
+
+    @Override
+    public void setDeadLetterUri(String deadLetterUri) {
+        this.deadLetterChannel = deadLetterUri;
+    }
+
+    @Override
+    public String getDeadLetterUri() {
+        return deadLetterChannel;
+    }
+
+    @Override
+    public void setMaximumRedeliveries(int maximumRedeliveries) {
+        this.maximumRedeliveries = maximumRedeliveries;
+    }
+
+    @Override
+    public int getMaximumRedeliveries() {
+        return maximumRedeliveries;
+    }
+
+    @Override
+    public Exchange get(CamelContext camelContext, String key) {
+        return unmarshallExchange(camelContext, cache.get(key));
+    }
+
+    /**
+     * This method performs transactional operation on removing the {@code exchange}
+     * from the operational storage and moving it into the persistent one if the {@link HazelcastAggregationRepository}
+     * runs in recoverable mode and {@code optimistic} is false. It will act at <u>your own</u> risk otherwise.
+     * @param camelContext   the current CamelContext
+     * @param key            the correlation key
+     * @param exchange       the exchange to remove
+     */
+    @Override
+    public void remove(CamelContext camelContext, String key, Exchange exchange) {
+        DefaultExchangeHolder holder = DefaultExchangeHolder.marshal(exchange);
+        if (optimistic) {
+            LOG.trace("Removing an exchange with ID {} for key {} in an optimistic manner.", exchange.getExchangeId(), key);
+            if (!cache.remove(key, holder)) {
+                LOG.error("Optimistic locking failed for exchange with key {}: IMap#remove removed no Exchanges, while it's expected to remove one.",
+                        key);
+                throw new OptimisticLockingException();
+            }
+            LOG.trace("Removed an exchange with ID {} for key {} in an optimistic manner.", exchange.getExchangeId(), key);
+            if (useRecovery) {
+                LOG.trace("Putting an exchange with ID {} for key {} into a recoverable storage in an optimistic manner.",
+                        exchange.getExchangeId(), key);
+                persistedCache.put(exchange.getExchangeId(), holder);
+                LOG.trace("Put an exchange with ID {} for key {} into a recoverable storage in an optimistic manner.",
+                        exchange.getExchangeId(), key);
+            }
+        } else {
+            if (useRecovery) {
+                LOG.trace("Removing an exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key);
+                // The only considerable case for transaction usage is fault tolerance:
+                // the transaction will be rolled back automatically (default timeout is 2 minutes)
+                // if no commit occurs during the timeout. So we are still consistent whether local node crashes.
+                TransactionOptions tOpts = new TransactionOptions();
+
+                tOpts.setTransactionType(TransactionOptions.TransactionType.LOCAL);
+                TransactionContext tCtx = hzInstance.newTransactionContext(tOpts);
+
+                try {
+
+                    tCtx.beginTransaction();
+
+                    TransactionalMap<String, DefaultExchangeHolder> tCache = tCtx.getMap(cache.getName());
+                    TransactionalMap<String, DefaultExchangeHolder> tPersistentCache = tCtx.getMap(persistedCache.getName());
+
+                    DefaultExchangeHolder removedHolder = tCache.remove(key);
+                    LOG.trace("Putting an exchange with ID {} for key {} into a recoverable storage in a thread-safe manner.",
+                            exchange.getExchangeId(), key);
+                    tPersistentCache.put(exchange.getExchangeId(), removedHolder);
+
+                    tCtx.commitTransaction();
+                    LOG.trace("Removed an exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key);
+                    LOG.trace("Put an exchange with ID {} for key {} into a recoverable storage in a thread-safe manner.",
+                            exchange.getExchangeId(), key);
+                } catch (Throwable throwable) {
+                    tCtx.rollbackTransaction();
+
+                    final String msg = String.format("Transaction with ID %s was rolled back for remove operation with a key %s and an Exchange ID %s.",
+                            tCtx.getTxnId(), key, exchange.getExchangeId());
+                    LOG.warn(msg, throwable);
+                    throw new RuntimeException(msg, throwable);
+                }
+            } else {
+                cache.remove(key);
+            }
+        }
+    }
+
+    @Override
+    public void confirm(CamelContext camelContext, String exchangeId) {
+        LOG.trace("Confirming an exchange with ID {}.", exchangeId);
+        persistedCache.remove(exchangeId);
+    }
+
+    @Override
+    public Set<String> getKeys() {
+        return Collections.unmodifiableSet(cache.keySet());
+    }
+
+    /**
+     * @return Persistent repository {@link IMap} name;
+     */
+    public String getPersistentRepositoryName() {
+        return persistenceMapName;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        if (maximumRedeliveries < 0) {
+            throw new IllegalArgumentException("Maximum redelivery retries must be zero or a positive integer.");
+        }
+        if (recoveryInterval < 0) {
+            throw new IllegalArgumentException("Recovery interval must be zero or a positive integer.");
+        }
+        ObjectHelper.notEmpty(mapName, "repositoryName");
+        if (useLocalHzInstance)  {
+            Config cfg = new XmlConfigBuilder().build();
+            cfg.setProperty("hazelcast.version.check.enabled", "false");
+            hzInstance = Hazelcast.newHazelcastInstance(cfg);
+        } else {
+            ObjectHelper.notNull(hzInstance, "hzInstanse");
+        }
+        cache = hzInstance.getMap(mapName);
+        if (useRecovery) {
+            persistedCache = hzInstance.getMap(persistenceMapName);
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (useRecovery) {
+            persistedCache.clear();
+        }
+        cache.clear();
+        if (useLocalHzInstance) {
+            hzInstance.getLifecycleService().shutdown();
+        }
+    }
+
+    private Exchange unmarshallExchange(CamelContext camelContext, DefaultExchangeHolder holder) {
+        Exchange exchange = null;
+        if (holder != null) {
+            exchange = new DefaultExchange(camelContext);
+            DefaultExchangeHolder.unmarshal(exchange, holder);
+        }
+        return exchange;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b45fff0a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryCamelTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryCamelTestSupport.java b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryCamelTestSupport.java
new file mode 100644
index 0000000..166c8a0
--- /dev/null
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryCamelTestSupport.java
@@ -0,0 +1,45 @@
+package org.apache.camel.processor.aggregate.hazelcast;
+
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/**
+ * @author Alexander Lomov
+ *         Date: 04.01.14
+ *         Time: 3:00
+ */
+public class HazelcastAggregationRepositoryCamelTestSupport extends CamelTestSupport {
+    private static HazelcastInstance hzOne = null;
+    private static HazelcastInstance hzTwo = null;
+
+    protected static void doInitializeHazelcastInstances() {
+        hzOne = Hazelcast.newHazelcastInstance();
+        hzTwo = Hazelcast.newHazelcastInstance();
+    }
+
+    protected static void doDestroyHazelcastInstances() {
+        hzOne.getLifecycleService().shutdown();
+        hzTwo.getLifecycleService().shutdown();
+    }
+
+    protected static HazelcastInstance getFirstInstance() {
+        return hzOne;
+    }
+
+    protected static HazelcastInstance getSecondInstance() {
+        return hzTwo;
+    }
+
+    @BeforeClass
+    public static void setUpHazelcastCluster() {
+        doInitializeHazelcastInstances();
+    }
+
+    @AfterClass
+    public static void shutDownHazelcastCluster() {
+        doDestroyHazelcastInstances();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b45fff0a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryConstructorsTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryConstructorsTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryConstructorsTest.java
new file mode 100644
index 0000000..5258129
--- /dev/null
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryConstructorsTest.java
@@ -0,0 +1,65 @@
+package org.apache.camel.processor.aggregate.hazelcast;
+
+import com.hazelcast.core.HazelcastInstance;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+
+public class HazelcastAggregationRepositoryConstructorsTest extends CamelTestSupport {
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void nonOptimisticRepoFailsOnOptimisticAdd() throws Exception {
+        final String repoName = "hzRepoMap";
+        HazelcastAggregationRepository repo = new HazelcastAggregationRepository(repoName);
+        repo.doStart();
+
+        try {
+            Exchange oldOne = new DefaultExchange(context());
+            Exchange newOne = new DefaultExchange(context());
+            final String key = "abrakadabra";
+            repo.add(context(), key, oldOne, newOne);
+            fail("OptimisticLockingException should has been thrown");
+        } finally {
+            repo.doStop();
+        }
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void optimisticRepoFailsForNonOptimisticAdd() throws Exception {
+        final String repoName = "hzRepoMap";
+        HazelcastAggregationRepository repo = new HazelcastAggregationRepository(repoName, true);
+        repo.doStart();
+
+        try {
+            Exchange ex = new DefaultExchange(context());
+            final String key = "abrakadabra";
+            repo.add(context(), key, ex);
+        } finally {
+            repo.doStop();
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void uninitializedHazelcastInstanceThrows() throws Exception {
+        final String repoName = "hzRepoMap";
+        HazelcastAggregationRepository repo = new HazelcastAggregationRepository(repoName, (HazelcastInstance) null);
+        repo.doStart();
+    }
+
+    @Test
+    public void locallyInitializedHazelcastInstanceAdd() throws Exception {
+       HazelcastAggregationRepository repo = new HazelcastAggregationRepository("hzRepoMap");
+        try {
+            repo.doStart();
+            Exchange ex = new DefaultExchange(context());
+            repo.add(context(), "somedefaultkey", ex);
+        //} catch (Throwable e) {
+            //fail(e.getMessage());
+        }
+        finally {
+            repo.doStop();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b45fff0a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryOperationsTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryOperationsTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryOperationsTest.java
new file mode 100644
index 0000000..fea8176
--- /dev/null
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryOperationsTest.java
@@ -0,0 +1,155 @@
+package org.apache.camel.processor.aggregate.hazelcast;
+
+import org.apache.camel.Exchange;
+import org.junit.Test;
+
+import java.util.Set;
+
+
+public class HazelcastAggregationRepositoryOperationsTest extends HazelcastAggregationRepositoryCamelTestSupport {
+
+
+    private static final String THREAD_SAFE_REPO = "threadSafeRepo";
+    private static final String OPTIMISTIC_REPO = "optimisticRepo";
+
+
+    @Test
+    public void checkOptimisticAddOfNewExchange() throws Exception {
+        HazelcastAggregationRepository repoOne = new HazelcastAggregationRepository(OPTIMISTIC_REPO, true, getFirstInstance());
+        HazelcastAggregationRepository repoTwo = new HazelcastAggregationRepository(OPTIMISTIC_REPO, true, getSecondInstance());
+
+        try {
+            repoOne.doStart();
+            repoTwo.doStart();
+
+            final String testBody = "This is an optimistic test body. Sincerely yours, Captain Obvious.";
+            final String key = "optimisticKey";
+            Exchange newEx = createExchangeWithBody(testBody);
+            Exchange oldEx = repoOne.add(context(), key, null, newEx);
+
+            assertNull("Old exchange should be null.", oldEx);
+
+            final String theNewestBody = "This is the newest test body.";
+            Exchange theNewestEx = createExchangeWithBody(theNewestBody);
+
+            oldEx = repoTwo.add(context(), key, newEx, theNewestEx);
+            assertEquals(newEx.getIn().getBody(), oldEx.getIn().getBody());
+
+        } finally {
+            repoOne.stop();
+            repoTwo.stop();
+        }
+    }
+
+    @Test
+    public void checkThreadSafeAddOfNewExchange() throws Exception {
+        HazelcastAggregationRepository repoOne = new HazelcastAggregationRepository(THREAD_SAFE_REPO, false, getFirstInstance());
+        HazelcastAggregationRepository repoTwo = new HazelcastAggregationRepository(THREAD_SAFE_REPO, false, getSecondInstance());
+
+        try {
+            repoOne.doStart();
+            repoTwo.doStart();
+
+            final String testBody = "This is a thread-safe test body. Sincerely yours, Captain Obvious.";
+            final String key = "threadSafeKey";
+            Exchange newEx = createExchangeWithBody(testBody);
+            Exchange oldEx = repoOne.add(context(), key, newEx);
+
+            assertNull("Old exchange should be null.", oldEx);
+
+            final String theNewestBody = "This is the newest test body.";
+            Exchange theNewestEx = createExchangeWithBody(theNewestBody);
+
+            oldEx = repoTwo.add(context(), key, theNewestEx);
+            assertEquals(newEx.getIn().getBody(), oldEx.getIn().getBody());
+
+        } finally {
+            repoOne.stop();
+            repoTwo.stop();
+        }
+    }
+
+    @Test
+    public void checkOptimisticGet() throws Exception {
+        HazelcastAggregationRepository repoOne = new HazelcastAggregationRepository(THREAD_SAFE_REPO, true, getFirstInstance());
+        HazelcastAggregationRepository repoTwo = new HazelcastAggregationRepository(THREAD_SAFE_REPO, true, getSecondInstance());
+        try {
+            repoOne.doStart();
+            repoTwo.doStart();
+
+            final String testBody = "This is an optimistic test body. Sincerely yours, Captain Obvious.";
+            final String key = "optimisticKey";
+
+            Exchange ex = createExchangeWithBody(testBody);
+            repoOne.add(context(), key, null, ex);
+
+            Exchange gotEx = repoTwo.get(context(), key);
+            assertEquals("ex and gotEx should be equal", gotEx.getIn().getBody(), ex.getIn().getBody());
+        } finally {
+            repoOne.doStop();
+            repoTwo.doStop();
+        }
+    }
+
+    @Test
+    public void checkThreadSafeGet() throws Exception {
+        HazelcastAggregationRepository repoOne = new HazelcastAggregationRepository(OPTIMISTIC_REPO, false, getFirstInstance());
+        HazelcastAggregationRepository repoTwo = new HazelcastAggregationRepository(OPTIMISTIC_REPO, false, getSecondInstance());
+
+        try {
+            repoOne.doStart();
+            repoTwo.doStart();
+
+
+            final String testBody = "This is a thread-safe test body. Sincerely yours, Captain Obvious.";
+            final String key = "threadSafeKey";
+
+            Exchange ex = createExchangeWithBody(testBody);
+            repoOne.add(context(), key, ex);
+
+            Exchange gotEx = repoTwo.get(context(), key);
+            assertEquals("ex and gotEx should be equal", gotEx.getIn().getBody(), ex.getIn().getBody());
+        } finally {
+            repoOne.doStop();
+            repoTwo.doStop();
+        }
+    }
+
+    @Test
+    public void checkOptimisticPersistentRemove() throws Exception {
+        final String persistentRepoName = String.format("%s-completed", OPTIMISTIC_REPO);
+        HazelcastAggregationRepository repoOne = new HazelcastAggregationRepository(OPTIMISTIC_REPO, persistentRepoName, true, getFirstInstance());
+        HazelcastAggregationRepository repoTwo = new HazelcastAggregationRepository(OPTIMISTIC_REPO, persistentRepoName, true, getSecondInstance());
+
+        try {
+            repoOne.doStart();
+            repoTwo.doStart();
+
+            final String testBody = "This is an optimistic test body. Sincerely yours, Captain Obvious.";
+            final String key = "optimisticKey";
+
+            Exchange ex = createExchangeWithBody(testBody);
+
+            repoOne.add(context(), key, null, ex);
+
+            Exchange getBackEx = repoTwo.get(context(), key);
+            assertNotNull("getBackEx should not be null.", getBackEx);
+
+            repoTwo.remove(context(), key, ex);
+
+            getBackEx = repoOne.get(context(), key);
+            assertNull("getBackEx should be null here.", getBackEx);
+
+            Set<String> keys = repoTwo.scan(context());
+            assertCollectionSize(keys, 1);
+
+            getBackEx = repoOne.recover(context(), keys.iterator().next());
+            assertNotNull("getBackEx got from persistent repo should not be null.", getBackEx);
+
+
+        } finally {
+            repoOne.doStop();
+            repoTwo.doStop();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b45fff0a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRecoverableRoutesTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRecoverableRoutesTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRecoverableRoutesTest.java
new file mode 100644
index 0000000..cea5903
--- /dev/null
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRecoverableRoutesTest.java
@@ -0,0 +1,133 @@
+package org.apache.camel.processor.aggregate.hazelcast;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+/**
+ * @author Alexander Lomov
+ *         Date: 04.01.14
+ *         Time: 4:37
+ */
+
+public class HazelcastAggregationRepositoryRecoverableRoutesTest extends HazelcastAggregationRepositoryCamelTestSupport {
+
+    private static final String REPO_NAME = "routeTestRepo";
+    private static final String MOCK_GOTCHA = "mock:gotcha";
+    private static final String MOCK_FAILURE = "mock:failure";
+    private static final String DIRECT_ONE = "direct:one";
+    private static final String DIRECT_TWO = "direct:two";
+
+    @EndpointInject(uri = MOCK_GOTCHA)
+    private MockEndpoint mockGotcha;
+
+    @EndpointInject(uri = MOCK_FAILURE)
+    private MockEndpoint mockFailure;
+
+    @Produce(uri = DIRECT_ONE)
+    private ProducerTemplate produceOne;
+
+    @Produce(uri = DIRECT_TWO)
+    private ProducerTemplate produceTwo;
+
+    @Test
+    public void checkAggregationFromTwoRoutesWithRecovery() throws Exception {
+        final HazelcastAggregationRepository repoOne =
+                new HazelcastAggregationRepository(REPO_NAME, false, getFirstInstance());
+
+        final HazelcastAggregationRepository repoTwo =
+                new HazelcastAggregationRepository(REPO_NAME, false, getSecondInstance());
+
+        final int completionSize = 4;
+        final String correlator = "CORRELATOR";
+
+        RouteBuilder rbOne = new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                onException(EverythingIsLostException.class)
+                    .handled(true)
+                    .useOriginalMessage()
+                    .to(MOCK_GOTCHA)
+                .end();
+
+                interceptSendToEndpoint(MOCK_FAILURE)
+                    .throwException(new EverythingIsLostException("The field is lost... everything is lost"))
+                .end();
+
+                from(DIRECT_ONE)
+                    .aggregate(header(correlator))
+                    .aggregationRepository(repoOne)
+                    .aggregationStrategy(new SumOfIntsAggregationStrategy())
+                    .completionSize(completionSize)
+                .to(MOCK_FAILURE);
+
+            }
+        };
+
+
+        RouteBuilder rbTwo = new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                onException(EverythingIsLostException.class)
+                    .handled(true)
+                    .useOriginalMessage()
+                    .to(MOCK_GOTCHA)
+                .end();
+
+                interceptSendToEndpoint(MOCK_FAILURE)
+                    .throwException(new EverythingIsLostException("The field is lost... everything is lost"))
+                .end();
+
+                from(DIRECT_TWO)
+                    .aggregate(header(correlator))
+                    .aggregationRepository(repoTwo)
+                    .aggregationStrategy(new SumOfIntsAggregationStrategy())
+                    .completionSize(completionSize)
+                .to(MOCK_FAILURE);
+            }
+        };
+
+        context().addRoutes(rbOne);
+        context().addRoutes(rbTwo);
+        context().start();
+
+        mockFailure.expectedMessageCount(0);
+        mockGotcha.expectedMessageCount(1);
+        mockGotcha.expectedBodiesReceived(1 + 2 + 3 + 4);
+
+        produceOne.sendBodyAndHeader(4, correlator, correlator);
+        produceTwo.sendBodyAndHeader(2, correlator, correlator);
+        produceOne.sendBodyAndHeader(3, correlator, correlator);
+        produceTwo.sendBodyAndHeader(1, correlator, correlator);
+
+        mockFailure.assertIsSatisfied();
+        mockFailure.assertIsSatisfied();
+    }
+
+    @SuppressWarnings("unused")
+    private static class EverythingIsLostException extends Exception {
+        private EverythingIsLostException() {
+        }
+
+        private EverythingIsLostException(String message) {
+            super(message);
+        }
+
+        private EverythingIsLostException(String message, Throwable cause) {
+            super(message, cause);
+        }
+
+        private EverythingIsLostException(Throwable cause) {
+            super(cause);
+        }
+
+        private EverythingIsLostException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+            super(message, cause, enableSuppression, writableStackTrace);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b45fff0a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRoutesTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRoutesTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRoutesTest.java
new file mode 100644
index 0000000..2b16a5d
--- /dev/null
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRoutesTest.java
@@ -0,0 +1,84 @@
+package org.apache.camel.processor.aggregate.hazelcast;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+/**
+ * @author Alexander Lomov
+ *         Date: 04.01.14
+ *         Time: 2:40
+ */
+
+public class HazelcastAggregationRepositoryRoutesTest extends HazelcastAggregationRepositoryCamelTestSupport {
+
+    private static final String REPO_NAME = "routeTestRepo";
+    private static final String MOCK_GOTCHA = "mock:gotcha";
+    private static final String DIRECT_ONE = "direct:one";
+    private static final String DIRECT_TWO = "direct:two";
+
+    @EndpointInject(uri = MOCK_GOTCHA)
+    private MockEndpoint mock;
+
+    @Produce(uri = DIRECT_ONE)
+    private ProducerTemplate produceOne;
+
+    @Produce(uri = DIRECT_TWO)
+    private ProducerTemplate produceTwo;
+
+
+    @Test
+    public void checkAggregationFromTwoRoutes() throws Exception {
+        final HazelcastAggregationRepository repoOne =
+                new HazelcastAggregationRepository(REPO_NAME, false, getFirstInstance());
+
+        final HazelcastAggregationRepository repoTwo =
+                new HazelcastAggregationRepository(REPO_NAME, false, getSecondInstance());
+
+        final int completionSize = 4;
+        final String correlator = "CORRELATOR";
+        RouteBuilder rbOne = new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                from(DIRECT_ONE).routeId("AggregatingRouteOne")
+                        .aggregate(header(correlator))
+                        .aggregationRepository(repoOne)
+                        .aggregationStrategy(new SumOfIntsAggregationStrategy())
+                        .completionSize(completionSize)
+                        .to(MOCK_GOTCHA);
+            }
+        };
+
+        RouteBuilder rbTwo = new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                from(DIRECT_TWO).routeId("AggregatingRouteTwo")
+                        .aggregate(header(correlator))
+                        .aggregationRepository(repoTwo)
+                        .aggregationStrategy(new SumOfIntsAggregationStrategy())
+                        .completionSize(completionSize)
+                        .to(MOCK_GOTCHA);
+            }
+        };
+
+        context().addRoutes(rbOne);
+        context().addRoutes(rbTwo);
+        context().start();
+
+        mock.expectedMessageCount(1);
+        mock.expectedBodiesReceived(1 + 2 + 3 + 4);
+
+        produceOne.sendBodyAndHeader(1, correlator, correlator);
+        produceTwo.sendBodyAndHeader(2, correlator, correlator);
+        produceOne.sendBodyAndHeader(3, correlator, correlator);
+        produceOne.sendBodyAndHeader(4, correlator, correlator);
+
+        mock.assertIsSatisfied();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b45fff0a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/SumOfIntsAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/SumOfIntsAggregationStrategy.java b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/SumOfIntsAggregationStrategy.java
new file mode 100644
index 0000000..bf88b63
--- /dev/null
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/SumOfIntsAggregationStrategy.java
@@ -0,0 +1,24 @@
+package org.apache.camel.processor.aggregate.hazelcast;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+* @author Alexander Lomov
+*         Date: 04.01.14
+*         Time: 13:25
+*/
+class SumOfIntsAggregationStrategy implements AggregationStrategy {
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        if (oldExchange == null) {
+            return newExchange;
+        } else {
+            Integer n = newExchange.getIn().getBody(Integer.class);
+            Integer o = oldExchange.getIn().getBody(Integer.class);
+            Integer v = (o == null ? 0 : o) + (n == null ? 0 : n);
+            oldExchange.getIn().setBody(v, Integer.class);
+            return oldExchange;
+        }
+    }
+}