You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/04/16 10:16:26 UTC
[2/4] git commit: CAMEL-7118 Add Hazelcast-based Recoverable
Aggregation Repository with thanks to Alexander
CAMEL-7118 Add Hazelcast-based Recoverable Aggregation Repository with thanks to Alexander
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b45fff0a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b45fff0a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b45fff0a
Branch: refs/heads/master
Commit: b45fff0a36c66afcc97d5d8e3e600ba82a3cf8aa
Parents: 529f5da
Author: Willem Jiang <wi...@gmail.com>
Authored: Wed Apr 16 14:53:57 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Wed Apr 16 16:09:32 2014 +0800
----------------------------------------------------------------------
components/camel-hazelcast/pom.xml | 3 +-
.../HazelcastAggregationRepository.java | 403 +++++++++++++++++++
...stAggregationRepositoryCamelTestSupport.java | 45 +++
...stAggregationRepositoryConstructorsTest.java | 65 +++
...castAggregationRepositoryOperationsTest.java | 155 +++++++
...regationRepositoryRecoverableRoutesTest.java | 133 ++++++
...azelcastAggregationRepositoryRoutesTest.java | 84 ++++
.../hazelcast/SumOfIntsAggregationStrategy.java | 24 ++
8 files changed, 911 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/b45fff0a/components/camel-hazelcast/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/pom.xml b/components/camel-hazelcast/pom.xml
index 7e04e90..3bdf208 100644
--- a/components/camel-hazelcast/pom.xml
+++ b/components/camel-hazelcast/pom.xml
@@ -35,7 +35,8 @@
</camel.osgi.import.before.defaults>
<camel.osgi.export.pkg>
org.apache.camel.component.hazelcast.*;${camel.osgi.version},
- org.apache.camel.processor.idempotent.hazelcast.*
+ org.apache.camel.processor.idempotent.hazelcast.*,
+ org.apache.camel.processor.aggregate.hazelcast.*
</camel.osgi.export.pkg>
<camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=hazelcast</camel.osgi.export.service>
</properties>
http://git-wip-us.apache.org/repos/asf/camel/blob/b45fff0a/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java b/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
new file mode 100644
index 0000000..3b493ca
--- /dev/null
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepository.java
@@ -0,0 +1,403 @@
+package org.apache.camel.processor.aggregate.hazelcast;
+
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.XmlConfigBuilder;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+import com.hazelcast.core.TransactionalMap;
+import com.hazelcast.transaction.TransactionContext;
+import com.hazelcast.transaction.TransactionOptions;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultExchangeHolder;
+import org.apache.camel.spi.OptimisticLockingAggregationRepository;
+import org.apache.camel.spi.RecoverableAggregationRepository;
+import org.apache.camel.support.ServiceSupport;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Hazelcast-based AggregationRepository implementing
+ * {@link RecoverableAggregationRepository} and {@link OptimisticLockingAggregationRepository}.
+ * Defaults to thread-safe (non-optimistic) locking and recoverable strategy.
+ * Hazelcast settings are given to an end-user and can be controlled with repositoryName and persistentRespositoryName,
+ * both are {@link com.hazelcast.core.IMap} <String, Exchange>. However HazelcastAggregationRepository
+ * can run it's own Hazelcast instance, but obviously no benefits of Hazelcast clustering are gained this way.
+ * If the {@link HazelcastAggregationRepository} uses it's own local {@link HazelcastInstance} it will destroy this
+ * instance on {@link #doStop()}. You should control {@link HazelcastInstance} lifecycle yourself whenever you instantiate
+ * {@link HazelcastAggregationRepository} passing a reference to the instance.
+ *
+ */
+public final class HazelcastAggregationRepository extends ServiceSupport
+ implements RecoverableAggregationRepository,
+ OptimisticLockingAggregationRepository {
+ private boolean optimistic;
+ private boolean useLocalHzInstance;
+ private boolean useRecovery = true;
+ private IMap<String, DefaultExchangeHolder> cache;
+ private IMap<String, DefaultExchangeHolder> persistedCache;
+ private static final Logger LOG = LoggerFactory.getLogger(HazelcastAggregationRepository.class.getName()) ;
+ private HazelcastInstance hzInstance;
+ private String mapName;
+ private String persistenceMapName;
+ private static final String COMPLETED_SUFFIX = "-completed";
+ private String deadLetterChannel;
+ private long recoveryInterval = 5000;
+ private int maximumRedeliveries = 3;
+
+ /**
+ * Creates new {@link HazelcastAggregationRepository} that defaults to non-optimistic locking
+ * with recoverable behavior and a local Hazelcast instance. Recoverable repository name defaults to
+ * {@code repositoryName} + "-compeleted".
+ * @param repositoryName {@link IMap} repository name;
+ */
+ public HazelcastAggregationRepository(final String repositoryName) {
+ mapName = repositoryName;
+ persistenceMapName = String.format("%s%s", mapName, COMPLETED_SUFFIX);
+ optimistic = false;
+ useLocalHzInstance = true;
+ }
+
+ /**
+ * Creates new {@link HazelcastAggregationRepository} that defaults to non-optimistic locking
+ * with recoverable behavior and a local Hazelcast instance.
+ * @param repositoryName {@link IMap} repository name;
+ * @param persistentRepositoryName {@link IMap} recoverable repository name;
+ */
+ public HazelcastAggregationRepository(final String repositoryName, final String persistentRepositoryName) {
+ mapName = repositoryName;
+ persistenceMapName = persistentRepositoryName;
+ optimistic = false;
+ useLocalHzInstance = true;
+ }
+
+ /**
+ * Creates new {@link HazelcastAggregationRepository} with recoverable behavior and a local Hazelcast instance.
+ * Recoverable repository name defaults to {@code repositoryName} + "-compeleted".
+ * @param repositoryName {@link IMap} repository name;
+ * @param optimistic whether to use optimistic locking manner.
+ */
+ public HazelcastAggregationRepository(final String repositoryName, boolean optimistic) {
+ this(repositoryName);
+ this.optimistic = optimistic;
+ useLocalHzInstance = true;
+ }
+
+ /**
+ * Creates new {@link HazelcastAggregationRepository} with recoverable behavior and a local Hazelcast instance.
+ * @param repositoryName {@link IMap} repository name;
+ * @param persistentRepositoryName {@link IMap} recoverable repository name;
+ * @param optimistic whether to use optimistic locking manner.
+ */
+ public HazelcastAggregationRepository(final String repositoryName, final String persistentRepositoryName, boolean optimistic) {
+ this(repositoryName, persistentRepositoryName);
+ this.optimistic = optimistic;
+ useLocalHzInstance = true;
+ }
+
+ /**
+ * Creates new {@link HazelcastAggregationRepository} that defaults to non-optimistic locking
+ * with recoverable behavior. Recoverable repository name defaults to
+ * {@code repositoryName} + "-compeleted".
+ * @param repositoryName {@link IMap} repository name;
+ * @param hzInstanse externally configured {@link HazelcastInstance}.
+ */
+ public HazelcastAggregationRepository(final String repositoryName, HazelcastInstance hzInstanse) {
+ this (repositoryName, false);
+ this.hzInstance = hzInstanse;
+ useLocalHzInstance = false;
+ }
+
+ /**
+ * Creates new {@link HazelcastAggregationRepository} that defaults to non-optimistic locking
+ * with recoverable behavior.
+ * @param repositoryName {@link IMap} repository name;
+ * @param persistentRepositoryName {@link IMap} recoverable repository name;
+ * @param hzInstanse externally configured {@link HazelcastInstance}.
+ */
+ public HazelcastAggregationRepository(final String repositoryName, final String persistentRepositoryName, HazelcastInstance hzInstanse) {
+ this (repositoryName, persistentRepositoryName, false);
+ this.hzInstance = hzInstanse;
+ useLocalHzInstance = false;
+ }
+
+ /**
+ * Creates new {@link HazelcastAggregationRepository} with recoverable behavior.
+ * Recoverable repository name defaults to {@code repositoryName} + "-compeleted".
+ * @param repositoryName {@link IMap} repository name;
+ * @param optimistic whether to use optimistic locking manner;
+ * @param hzInstance externally configured {@link HazelcastInstance}.
+ */
+ public HazelcastAggregationRepository(final String repositoryName, boolean optimistic, HazelcastInstance hzInstance) {
+ this(repositoryName, optimistic);
+ this.hzInstance = hzInstance;
+ useLocalHzInstance = false;
+ }
+
+ /**
+ * Creates new {@link HazelcastAggregationRepository} with recoverable behavior.
+ * @param repositoryName {@link IMap} repository name;
+ * @param optimistic whether to use optimistic locking manner;
+ * @param persistentRepositoryName {@link IMap} recoverable repository name;
+ * @param hzInstance externally configured {@link HazelcastInstance}.
+ */
+ public HazelcastAggregationRepository(final String repositoryName, final String persistentRepositoryName, boolean optimistic, HazelcastInstance hzInstance) {
+ this(repositoryName, persistentRepositoryName, optimistic);
+ this.hzInstance = hzInstance;
+ useLocalHzInstance = false;
+ }
+
+ @Override
+ public Exchange add(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) throws OptimisticLockingException {
+ if (!optimistic) {
+ throw new UnsupportedOperationException();
+ }
+ LOG.trace("Adding an Exchange with ID {} for key {} in an optimistic manner.", newExchange.getExchangeId(), key);
+ if (oldExchange == null) {
+ DefaultExchangeHolder holder = DefaultExchangeHolder.marshal(newExchange);
+ final DefaultExchangeHolder misbehaviorHolder = cache.putIfAbsent(key, holder);
+ if (misbehaviorHolder != null) {
+ Exchange misbehaviorEx = unmarshallExchange(camelContext, misbehaviorHolder);
+ LOG.error("Optimistic locking failed for exchange with key {}: IMap#putIfAbsend returned Exchange with ID {}, while it's expected no exchanges to be returned",
+ key, misbehaviorEx != null ? misbehaviorEx.getExchangeId() : "<null>");
+ throw new OptimisticLockingException();
+ }
+ } else {
+ DefaultExchangeHolder oldHolder = DefaultExchangeHolder.marshal(oldExchange);
+ DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(newExchange);
+ if (!cache.replace(key, oldHolder, newHolder)) {
+ LOG.error("Optimistic locking failed for exchange with key {}: IMap#replace returned no Exchanges, while it's expected to replace one",
+ key);
+ throw new OptimisticLockingException();
+ }
+ }
+ LOG.trace("Added an Exchange with ID {} for key {} in optimistic manner.", newExchange.getExchangeId(), key);
+ return oldExchange;
+ }
+
+ @Override
+ public Exchange add(CamelContext camelContext, String key, Exchange exchange) {
+ if (optimistic){
+ throw new UnsupportedOperationException();
+ }
+ LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key);
+ Lock l = hzInstance.getLock(mapName);
+ try {
+ l.lock();
+ DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(exchange);
+ DefaultExchangeHolder oldHolder = cache.put(key, newHolder);
+ return unmarshallExchange(camelContext, oldHolder);
+ } finally {
+ LOG.trace("Added an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key);
+ l.unlock();
+ }
+ }
+
+ @Override
+ public Set<String> scan(CamelContext camelContext) {
+ if (useRecovery) {
+ LOG.trace("Scanning for exchanges to recover in {} context", camelContext.getName());
+ Set<String> scanned = Collections.unmodifiableSet(persistedCache.keySet());
+ LOG.trace("Found {} keys for exchanges to recover in {} context", scanned.size(),camelContext.getName());
+ return scanned;
+ }
+ else {
+ LOG.warn("What for to run recovery scans in {} context while repository {} is running in non-recoverable aggregation repository mode?!",
+ camelContext.getName(), mapName);
+ return Collections.emptySet();
+ }
+ }
+
+ @Override
+ public Exchange recover(CamelContext camelContext, String exchangeId) {
+ LOG.trace("Recovering an Exchange with ID {}.", exchangeId);
+ return useRecovery ? unmarshallExchange(camelContext, persistedCache.get(exchangeId)) : null;
+ }
+
+ @Override
+ public void setRecoveryInterval(long interval, TimeUnit timeUnit) {
+ this.recoveryInterval = timeUnit.toMillis(interval);
+ }
+
+ @Override
+ public void setRecoveryInterval(long interval) {
+ this.recoveryInterval = interval;
+ }
+
+ @Override
+ public long getRecoveryIntervalInMillis() {
+ return recoveryInterval;
+ }
+
+ @Override
+ public void setUseRecovery(boolean useRecovery) {
+ this.useRecovery = useRecovery;
+ }
+
+ @Override
+ public boolean isUseRecovery() {
+ return useRecovery;
+ }
+
+ @Override
+ public void setDeadLetterUri(String deadLetterUri) {
+ this.deadLetterChannel = deadLetterUri;
+ }
+
+ @Override
+ public String getDeadLetterUri() {
+ return deadLetterChannel;
+ }
+
+ @Override
+ public void setMaximumRedeliveries(int maximumRedeliveries) {
+ this.maximumRedeliveries = maximumRedeliveries;
+ }
+
+ @Override
+ public int getMaximumRedeliveries() {
+ return maximumRedeliveries;
+ }
+
+ @Override
+ public Exchange get(CamelContext camelContext, String key) {
+ return unmarshallExchange(camelContext, cache.get(key));
+ }
+
+ /**
+ * This method performs transactional operation on removing the {@code exchange}
+ * from the operational storage and moving it into the persistent one if the {@link HazelcastAggregationRepository}
+ * runs in recoverable mode and {@code optimistic} is false. It will act at <u>your own</u> risk otherwise.
+ * @param camelContext the current CamelContext
+ * @param key the correlation key
+ * @param exchange the exchange to remove
+ */
+ @Override
+ public void remove(CamelContext camelContext, String key, Exchange exchange) {
+ DefaultExchangeHolder holder = DefaultExchangeHolder.marshal(exchange);
+ if (optimistic) {
+ LOG.trace("Removing an exchange with ID {} for key {} in an optimistic manner.", exchange.getExchangeId(), key);
+ if (!cache.remove(key, holder)) {
+ LOG.error("Optimistic locking failed for exchange with key {}: IMap#remove removed no Exchanges, while it's expected to remove one.",
+ key);
+ throw new OptimisticLockingException();
+ }
+ LOG.trace("Removed an exchange with ID {} for key {} in an optimistic manner.", exchange.getExchangeId(), key);
+ if (useRecovery) {
+ LOG.trace("Putting an exchange with ID {} for key {} into a recoverable storage in an optimistic manner.",
+ exchange.getExchangeId(), key);
+ persistedCache.put(exchange.getExchangeId(), holder);
+ LOG.trace("Put an exchange with ID {} for key {} into a recoverable storage in an optimistic manner.",
+ exchange.getExchangeId(), key);
+ }
+ } else {
+ if (useRecovery) {
+ LOG.trace("Removing an exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key);
+ // The only considerable case for transaction usage is fault tolerance:
+ // the transaction will be rolled back automatically (default timeout is 2 minutes)
+ // if no commit occurs during the timeout. So we are still consistent whether local node crashes.
+ TransactionOptions tOpts = new TransactionOptions();
+
+ tOpts.setTransactionType(TransactionOptions.TransactionType.LOCAL);
+ TransactionContext tCtx = hzInstance.newTransactionContext(tOpts);
+
+ try {
+
+ tCtx.beginTransaction();
+
+ TransactionalMap<String, DefaultExchangeHolder> tCache = tCtx.getMap(cache.getName());
+ TransactionalMap<String, DefaultExchangeHolder> tPersistentCache = tCtx.getMap(persistedCache.getName());
+
+ DefaultExchangeHolder removedHolder = tCache.remove(key);
+ LOG.trace("Putting an exchange with ID {} for key {} into a recoverable storage in a thread-safe manner.",
+ exchange.getExchangeId(), key);
+ tPersistentCache.put(exchange.getExchangeId(), removedHolder);
+
+ tCtx.commitTransaction();
+ LOG.trace("Removed an exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key);
+ LOG.trace("Put an exchange with ID {} for key {} into a recoverable storage in a thread-safe manner.",
+ exchange.getExchangeId(), key);
+ } catch (Throwable throwable) {
+ tCtx.rollbackTransaction();
+
+ final String msg = String.format("Transaction with ID %s was rolled back for remove operation with a key %s and an Exchange ID %s.",
+ tCtx.getTxnId(), key, exchange.getExchangeId());
+ LOG.warn(msg, throwable);
+ throw new RuntimeException(msg, throwable);
+ }
+ } else {
+ cache.remove(key);
+ }
+ }
+ }
+
+ @Override
+ public void confirm(CamelContext camelContext, String exchangeId) {
+ LOG.trace("Confirming an exchange with ID {}.", exchangeId);
+ persistedCache.remove(exchangeId);
+ }
+
+ @Override
+ public Set<String> getKeys() {
+ return Collections.unmodifiableSet(cache.keySet());
+ }
+
+ /**
+ * @return Persistent repository {@link IMap} name;
+ */
+ public String getPersistentRepositoryName() {
+ return persistenceMapName;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ if (maximumRedeliveries < 0) {
+ throw new IllegalArgumentException("Maximum redelivery retries must be zero or a positive integer.");
+ }
+ if (recoveryInterval < 0) {
+ throw new IllegalArgumentException("Recovery interval must be zero or a positive integer.");
+ }
+ ObjectHelper.notEmpty(mapName, "repositoryName");
+ if (useLocalHzInstance) {
+ Config cfg = new XmlConfigBuilder().build();
+ cfg.setProperty("hazelcast.version.check.enabled", "false");
+ hzInstance = Hazelcast.newHazelcastInstance(cfg);
+ } else {
+ ObjectHelper.notNull(hzInstance, "hzInstanse");
+ }
+ cache = hzInstance.getMap(mapName);
+ if (useRecovery) {
+ persistedCache = hzInstance.getMap(persistenceMapName);
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (useRecovery) {
+ persistedCache.clear();
+ }
+ cache.clear();
+ if (useLocalHzInstance) {
+ hzInstance.getLifecycleService().shutdown();
+ }
+ }
+
+ private Exchange unmarshallExchange(CamelContext camelContext, DefaultExchangeHolder holder) {
+ Exchange exchange = null;
+ if (holder != null) {
+ exchange = new DefaultExchange(camelContext);
+ DefaultExchangeHolder.unmarshal(exchange, holder);
+ }
+ return exchange;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/b45fff0a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryCamelTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryCamelTestSupport.java b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryCamelTestSupport.java
new file mode 100644
index 0000000..166c8a0
--- /dev/null
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryCamelTestSupport.java
@@ -0,0 +1,45 @@
+package org.apache.camel.processor.aggregate.hazelcast;
+
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/**
+ * @author Alexander Lomov
+ * Date: 04.01.14
+ * Time: 3:00
+ */
+public class HazelcastAggregationRepositoryCamelTestSupport extends CamelTestSupport {
+ private static HazelcastInstance hzOne = null;
+ private static HazelcastInstance hzTwo = null;
+
+ protected static void doInitializeHazelcastInstances() {
+ hzOne = Hazelcast.newHazelcastInstance();
+ hzTwo = Hazelcast.newHazelcastInstance();
+ }
+
+ protected static void doDestroyHazelcastInstances() {
+ hzOne.getLifecycleService().shutdown();
+ hzTwo.getLifecycleService().shutdown();
+ }
+
+ protected static HazelcastInstance getFirstInstance() {
+ return hzOne;
+ }
+
+ protected static HazelcastInstance getSecondInstance() {
+ return hzTwo;
+ }
+
+ @BeforeClass
+ public static void setUpHazelcastCluster() {
+ doInitializeHazelcastInstances();
+ }
+
+ @AfterClass
+ public static void shutDownHazelcastCluster() {
+ doDestroyHazelcastInstances();
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/b45fff0a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryConstructorsTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryConstructorsTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryConstructorsTest.java
new file mode 100644
index 0000000..5258129
--- /dev/null
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryConstructorsTest.java
@@ -0,0 +1,65 @@
+package org.apache.camel.processor.aggregate.hazelcast;
+
+import com.hazelcast.core.HazelcastInstance;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+
+public class HazelcastAggregationRepositoryConstructorsTest extends CamelTestSupport {
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void nonOptimisticRepoFailsOnOptimisticAdd() throws Exception {
+ final String repoName = "hzRepoMap";
+ HazelcastAggregationRepository repo = new HazelcastAggregationRepository(repoName);
+ repo.doStart();
+
+ try {
+ Exchange oldOne = new DefaultExchange(context());
+ Exchange newOne = new DefaultExchange(context());
+ final String key = "abrakadabra";
+ repo.add(context(), key, oldOne, newOne);
+ fail("OptimisticLockingException should has been thrown");
+ } finally {
+ repo.doStop();
+ }
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void optimisticRepoFailsForNonOptimisticAdd() throws Exception {
+ final String repoName = "hzRepoMap";
+ HazelcastAggregationRepository repo = new HazelcastAggregationRepository(repoName, true);
+ repo.doStart();
+
+ try {
+ Exchange ex = new DefaultExchange(context());
+ final String key = "abrakadabra";
+ repo.add(context(), key, ex);
+ } finally {
+ repo.doStop();
+ }
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void uninitializedHazelcastInstanceThrows() throws Exception {
+ final String repoName = "hzRepoMap";
+ HazelcastAggregationRepository repo = new HazelcastAggregationRepository(repoName, (HazelcastInstance) null);
+ repo.doStart();
+ }
+
+ @Test
+ public void locallyInitializedHazelcastInstanceAdd() throws Exception {
+ HazelcastAggregationRepository repo = new HazelcastAggregationRepository("hzRepoMap");
+ try {
+ repo.doStart();
+ Exchange ex = new DefaultExchange(context());
+ repo.add(context(), "somedefaultkey", ex);
+ //} catch (Throwable e) {
+ //fail(e.getMessage());
+ }
+ finally {
+ repo.doStop();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/b45fff0a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryOperationsTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryOperationsTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryOperationsTest.java
new file mode 100644
index 0000000..fea8176
--- /dev/null
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryOperationsTest.java
@@ -0,0 +1,155 @@
+package org.apache.camel.processor.aggregate.hazelcast;
+
+import org.apache.camel.Exchange;
+import org.junit.Test;
+
+import java.util.Set;
+
+
+public class HazelcastAggregationRepositoryOperationsTest extends HazelcastAggregationRepositoryCamelTestSupport {
+
+
+ private static final String THREAD_SAFE_REPO = "threadSafeRepo";
+ private static final String OPTIMISTIC_REPO = "optimisticRepo";
+
+
+ @Test
+ public void checkOptimisticAddOfNewExchange() throws Exception {
+ HazelcastAggregationRepository repoOne = new HazelcastAggregationRepository(OPTIMISTIC_REPO, true, getFirstInstance());
+ HazelcastAggregationRepository repoTwo = new HazelcastAggregationRepository(OPTIMISTIC_REPO, true, getSecondInstance());
+
+ try {
+ repoOne.doStart();
+ repoTwo.doStart();
+
+ final String testBody = "This is an optimistic test body. Sincerely yours, Captain Obvious.";
+ final String key = "optimisticKey";
+ Exchange newEx = createExchangeWithBody(testBody);
+ Exchange oldEx = repoOne.add(context(), key, null, newEx);
+
+ assertNull("Old exchange should be null.", oldEx);
+
+ final String theNewestBody = "This is the newest test body.";
+ Exchange theNewestEx = createExchangeWithBody(theNewestBody);
+
+ oldEx = repoTwo.add(context(), key, newEx, theNewestEx);
+ assertEquals(newEx.getIn().getBody(), oldEx.getIn().getBody());
+
+ } finally {
+ repoOne.stop();
+ repoTwo.stop();
+ }
+ }
+
+ @Test
+ public void checkThreadSafeAddOfNewExchange() throws Exception {
+ HazelcastAggregationRepository repoOne = new HazelcastAggregationRepository(THREAD_SAFE_REPO, false, getFirstInstance());
+ HazelcastAggregationRepository repoTwo = new HazelcastAggregationRepository(THREAD_SAFE_REPO, false, getSecondInstance());
+
+ try {
+ repoOne.doStart();
+ repoTwo.doStart();
+
+ final String testBody = "This is a thread-safe test body. Sincerely yours, Captain Obvious.";
+ final String key = "threadSafeKey";
+ Exchange newEx = createExchangeWithBody(testBody);
+ Exchange oldEx = repoOne.add(context(), key, newEx);
+
+ assertNull("Old exchange should be null.", oldEx);
+
+ final String theNewestBody = "This is the newest test body.";
+ Exchange theNewestEx = createExchangeWithBody(theNewestBody);
+
+ oldEx = repoTwo.add(context(), key, theNewestEx);
+ assertEquals(newEx.getIn().getBody(), oldEx.getIn().getBody());
+
+ } finally {
+ repoOne.stop();
+ repoTwo.stop();
+ }
+ }
+
+ @Test
+ public void checkOptimisticGet() throws Exception {
+ HazelcastAggregationRepository repoOne = new HazelcastAggregationRepository(THREAD_SAFE_REPO, true, getFirstInstance());
+ HazelcastAggregationRepository repoTwo = new HazelcastAggregationRepository(THREAD_SAFE_REPO, true, getSecondInstance());
+ try {
+ repoOne.doStart();
+ repoTwo.doStart();
+
+ final String testBody = "This is an optimistic test body. Sincerely yours, Captain Obvious.";
+ final String key = "optimisticKey";
+
+ Exchange ex = createExchangeWithBody(testBody);
+ repoOne.add(context(), key, null, ex);
+
+ Exchange gotEx = repoTwo.get(context(), key);
+ assertEquals("ex and gotEx should be equal", gotEx.getIn().getBody(), ex.getIn().getBody());
+ } finally {
+ repoOne.doStop();
+ repoTwo.doStop();
+ }
+ }
+
+ @Test
+ public void checkThreadSafeGet() throws Exception {
+ HazelcastAggregationRepository repoOne = new HazelcastAggregationRepository(OPTIMISTIC_REPO, false, getFirstInstance());
+ HazelcastAggregationRepository repoTwo = new HazelcastAggregationRepository(OPTIMISTIC_REPO, false, getSecondInstance());
+
+ try {
+ repoOne.doStart();
+ repoTwo.doStart();
+
+
+ final String testBody = "This is a thread-safe test body. Sincerely yours, Captain Obvious.";
+ final String key = "threadSafeKey";
+
+ Exchange ex = createExchangeWithBody(testBody);
+ repoOne.add(context(), key, ex);
+
+ Exchange gotEx = repoTwo.get(context(), key);
+ assertEquals("ex and gotEx should be equal", gotEx.getIn().getBody(), ex.getIn().getBody());
+ } finally {
+ repoOne.doStop();
+ repoTwo.doStop();
+ }
+ }
+
+ @Test
+ public void checkOptimisticPersistentRemove() throws Exception {
+ final String persistentRepoName = String.format("%s-completed", OPTIMISTIC_REPO);
+ HazelcastAggregationRepository repoOne = new HazelcastAggregationRepository(OPTIMISTIC_REPO, persistentRepoName, true, getFirstInstance());
+ HazelcastAggregationRepository repoTwo = new HazelcastAggregationRepository(OPTIMISTIC_REPO, persistentRepoName, true, getSecondInstance());
+
+ try {
+ repoOne.doStart();
+ repoTwo.doStart();
+
+ final String testBody = "This is an optimistic test body. Sincerely yours, Captain Obvious.";
+ final String key = "optimisticKey";
+
+ Exchange ex = createExchangeWithBody(testBody);
+
+ repoOne.add(context(), key, null, ex);
+
+ Exchange getBackEx = repoTwo.get(context(), key);
+ assertNotNull("getBackEx should not be null.", getBackEx);
+
+ repoTwo.remove(context(), key, ex);
+
+ getBackEx = repoOne.get(context(), key);
+ assertNull("getBackEx should be null here.", getBackEx);
+
+ Set<String> keys = repoTwo.scan(context());
+ assertCollectionSize(keys, 1);
+
+ getBackEx = repoOne.recover(context(), keys.iterator().next());
+ assertNotNull("getBackEx got from persistent repo should not be null.", getBackEx);
+
+
+ } finally {
+ repoOne.doStop();
+ repoTwo.doStop();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/b45fff0a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRecoverableRoutesTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRecoverableRoutesTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRecoverableRoutesTest.java
new file mode 100644
index 0000000..cea5903
--- /dev/null
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRecoverableRoutesTest.java
@@ -0,0 +1,133 @@
+package org.apache.camel.processor.aggregate.hazelcast;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+/**
+ * @author Alexander Lomov
+ * Date: 04.01.14
+ * Time: 4:37
+ */
+
+public class HazelcastAggregationRepositoryRecoverableRoutesTest extends HazelcastAggregationRepositoryCamelTestSupport {
+
+ private static final String REPO_NAME = "routeTestRepo";
+ private static final String MOCK_GOTCHA = "mock:gotcha";
+ private static final String MOCK_FAILURE = "mock:failure";
+ private static final String DIRECT_ONE = "direct:one";
+ private static final String DIRECT_TWO = "direct:two";
+
+ @EndpointInject(uri = MOCK_GOTCHA)
+ private MockEndpoint mockGotcha;
+
+ @EndpointInject(uri = MOCK_FAILURE)
+ private MockEndpoint mockFailure;
+
+ @Produce(uri = DIRECT_ONE)
+ private ProducerTemplate produceOne;
+
+ @Produce(uri = DIRECT_TWO)
+ private ProducerTemplate produceTwo;
+
+ @Test
+ public void checkAggregationFromTwoRoutesWithRecovery() throws Exception {
+ final HazelcastAggregationRepository repoOne =
+ new HazelcastAggregationRepository(REPO_NAME, false, getFirstInstance());
+
+ final HazelcastAggregationRepository repoTwo =
+ new HazelcastAggregationRepository(REPO_NAME, false, getSecondInstance());
+
+ final int completionSize = 4;
+ final String correlator = "CORRELATOR";
+
+ RouteBuilder rbOne = new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+
+ onException(EverythingIsLostException.class)
+ .handled(true)
+ .useOriginalMessage()
+ .to(MOCK_GOTCHA)
+ .end();
+
+ interceptSendToEndpoint(MOCK_FAILURE)
+ .throwException(new EverythingIsLostException("The field is lost... everything is lost"))
+ .end();
+
+ from(DIRECT_ONE)
+ .aggregate(header(correlator))
+ .aggregationRepository(repoOne)
+ .aggregationStrategy(new SumOfIntsAggregationStrategy())
+ .completionSize(completionSize)
+ .to(MOCK_FAILURE);
+
+ }
+ };
+
+
+ RouteBuilder rbTwo = new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+
+ onException(EverythingIsLostException.class)
+ .handled(true)
+ .useOriginalMessage()
+ .to(MOCK_GOTCHA)
+ .end();
+
+ interceptSendToEndpoint(MOCK_FAILURE)
+ .throwException(new EverythingIsLostException("The field is lost... everything is lost"))
+ .end();
+
+ from(DIRECT_TWO)
+ .aggregate(header(correlator))
+ .aggregationRepository(repoTwo)
+ .aggregationStrategy(new SumOfIntsAggregationStrategy())
+ .completionSize(completionSize)
+ .to(MOCK_FAILURE);
+ }
+ };
+
+ context().addRoutes(rbOne);
+ context().addRoutes(rbTwo);
+ context().start();
+
+ mockFailure.expectedMessageCount(0);
+ mockGotcha.expectedMessageCount(1);
+ mockGotcha.expectedBodiesReceived(1 + 2 + 3 + 4);
+
+ produceOne.sendBodyAndHeader(4, correlator, correlator);
+ produceTwo.sendBodyAndHeader(2, correlator, correlator);
+ produceOne.sendBodyAndHeader(3, correlator, correlator);
+ produceTwo.sendBodyAndHeader(1, correlator, correlator);
+
+ mockFailure.assertIsSatisfied();
+ mockFailure.assertIsSatisfied();
+ }
+
+ @SuppressWarnings("unused")
+ private static class EverythingIsLostException extends Exception {
+ private EverythingIsLostException() {
+ }
+
+ private EverythingIsLostException(String message) {
+ super(message);
+ }
+
+ private EverythingIsLostException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ private EverythingIsLostException(Throwable cause) {
+ super(cause);
+ }
+
+ private EverythingIsLostException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/b45fff0a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRoutesTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRoutesTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRoutesTest.java
new file mode 100644
index 0000000..2b16a5d
--- /dev/null
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/HazelcastAggregationRepositoryRoutesTest.java
@@ -0,0 +1,84 @@
+package org.apache.camel.processor.aggregate.hazelcast;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+/**
+ * @author Alexander Lomov
+ * Date: 04.01.14
+ * Time: 2:40
+ */
+
+public class HazelcastAggregationRepositoryRoutesTest extends HazelcastAggregationRepositoryCamelTestSupport {
+
+ private static final String REPO_NAME = "routeTestRepo";
+ private static final String MOCK_GOTCHA = "mock:gotcha";
+ private static final String DIRECT_ONE = "direct:one";
+ private static final String DIRECT_TWO = "direct:two";
+
+ @EndpointInject(uri = MOCK_GOTCHA)
+ private MockEndpoint mock;
+
+ @Produce(uri = DIRECT_ONE)
+ private ProducerTemplate produceOne;
+
+ @Produce(uri = DIRECT_TWO)
+ private ProducerTemplate produceTwo;
+
+
+ @Test
+ public void checkAggregationFromTwoRoutes() throws Exception {
+ final HazelcastAggregationRepository repoOne =
+ new HazelcastAggregationRepository(REPO_NAME, false, getFirstInstance());
+
+ final HazelcastAggregationRepository repoTwo =
+ new HazelcastAggregationRepository(REPO_NAME, false, getSecondInstance());
+
+ final int completionSize = 4;
+ final String correlator = "CORRELATOR";
+ RouteBuilder rbOne = new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+
+ from(DIRECT_ONE).routeId("AggregatingRouteOne")
+ .aggregate(header(correlator))
+ .aggregationRepository(repoOne)
+ .aggregationStrategy(new SumOfIntsAggregationStrategy())
+ .completionSize(completionSize)
+ .to(MOCK_GOTCHA);
+ }
+ };
+
+ RouteBuilder rbTwo = new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+
+ from(DIRECT_TWO).routeId("AggregatingRouteTwo")
+ .aggregate(header(correlator))
+ .aggregationRepository(repoTwo)
+ .aggregationStrategy(new SumOfIntsAggregationStrategy())
+ .completionSize(completionSize)
+ .to(MOCK_GOTCHA);
+ }
+ };
+
+ context().addRoutes(rbOne);
+ context().addRoutes(rbTwo);
+ context().start();
+
+ mock.expectedMessageCount(1);
+ mock.expectedBodiesReceived(1 + 2 + 3 + 4);
+
+ produceOne.sendBodyAndHeader(1, correlator, correlator);
+ produceTwo.sendBodyAndHeader(2, correlator, correlator);
+ produceOne.sendBodyAndHeader(3, correlator, correlator);
+ produceOne.sendBodyAndHeader(4, correlator, correlator);
+
+ mock.assertIsSatisfied();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/b45fff0a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/SumOfIntsAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/SumOfIntsAggregationStrategy.java b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/SumOfIntsAggregationStrategy.java
new file mode 100644
index 0000000..bf88b63
--- /dev/null
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/processor/aggregate/hazelcast/SumOfIntsAggregationStrategy.java
@@ -0,0 +1,24 @@
+package org.apache.camel.processor.aggregate.hazelcast;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+* @author Alexander Lomov
+* Date: 04.01.14
+* Time: 13:25
+*/
+class SumOfIntsAggregationStrategy implements AggregationStrategy {
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ } else {
+ Integer n = newExchange.getIn().getBody(Integer.class);
+ Integer o = oldExchange.getIn().getBody(Integer.class);
+ Integer v = (o == null ? 0 : o) + (n == null ? 0 : n);
+ oldExchange.getIn().setBody(v, Integer.class);
+ return oldExchange;
+ }
+ }
+}