You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2017/06/16 15:39:04 UTC
[03/13] camel git commit: CAMEL-10054: Create camel-atomix component
CAMEL-10054: Create camel-atomix component
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/49076653
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/49076653
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/49076653
Branch: refs/heads/master
Commit: 490766539527f823a32cb4aaffa46005956835c7
Parents: b6f1bdd
Author: lburgazzoli <lb...@gmail.com>
Authored: Thu Jun 1 12:27:37 2017 +0200
Committer: lburgazzoli <lb...@gmail.com>
Committed: Fri Jun 16 17:37:53 2017 +0200
----------------------------------------------------------------------
.../atomix/ha/AtomixRoutePolicyMain.java | 49 +++++++++++++-------
1 file changed, 32 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/49076653/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyMain.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyMain.java b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyMain.java
index 78ef149..0a126d3 100644
--- a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyMain.java
+++ b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyMain.java
@@ -36,6 +36,7 @@ import org.apache.camel.ha.CamelCluster;
import org.apache.camel.ha.CamelClusterView;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.ha.ClusteredRoutePolicy;
+import org.apache.camel.spi.RoutePolicy;
import org.apache.camel.test.AvailablePortFinder;
import org.apache.camel.util.FileUtil;
import org.slf4j.Logger;
@@ -44,23 +45,27 @@ import org.slf4j.LoggerFactory;
public final class AtomixRoutePolicyMain {
private static final Logger LOGGER = LoggerFactory.getLogger(AtomixRoutePolicyMain.class);
+
private static final List<Address> ADDRESSES = Arrays.asList(
new Address("127.0.0.1", AvailablePortFinder.getNextAvailable()),
new Address("127.0.0.1", AvailablePortFinder.getNextAvailable()),
new Address("127.0.0.1", AvailablePortFinder.getNextAvailable())
);
- private static final ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(ADDRESSES.size() * 3);
+ private static final CountDownLatch LATCH = new CountDownLatch(ADDRESSES.size());
+ private static final ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(ADDRESSES.size() * 2);
public static void main(final String[] args) throws Exception {
for (Address address : ADDRESSES) {
- EXECUTOR.submit(() -> setupContext(address));
+ EXECUTOR.submit(() -> run(address));
}
- EXECUTOR.awaitTermination(5, TimeUnit.MINUTES);
+ LATCH.await();
+
+ System.exit(0);
}
- static void setupContext(Address address) {
+ static void run(Address address) {
try {
final String id = String.format("atomix-%d", address.port());
final File path = new File("target", id);
@@ -73,7 +78,7 @@ public final class AtomixRoutePolicyMain {
.withStorage(
Storage.builder()
.withDirectory(path)
- .withStorageLevel(StorageLevel.DISK)
+ .withStorageLevel(StorageLevel.MEMORY)
.build())
.build()
.bootstrap(ADDRESSES)
@@ -86,34 +91,44 @@ public final class AtomixRoutePolicyMain {
view.addEventListener((e, p) -> {
if (view.getLocalMember().isMaster()) {
- LOGGER.info("{}, is now master", address);
- try {
- EXECUTOR.schedule(latch::countDown, 10, TimeUnit.SECONDS);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
+ LOGGER.info("Member {} ({}), is now master", view.getLocalMember().getId(), address);
+
+ // Shutdown the context later on so the next one should take
+ // the leadership
+ EXECUTOR.schedule(latch::countDown, 10, TimeUnit.SECONDS);
}
});
- context.disableJMX();
- context.addService(cluster, true, true);
+ context.addService(cluster);
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
- fromF("timer:%s?period=1s", id)
- .routeId(id)
- .routePolicy(new ClusteredRoutePolicy(view))
+ RoutePolicy policy = ClusteredRoutePolicy.forView(view);
+
+ fromF("timer:%s-1?period=2s", id)
+ .routeId(id + "-1")
+ .routePolicy(policy)
.setHeader("ClusterMaster")
.body(b -> view.getMaster().getId())
- .log("${routeId} - master is: ${header.ClusterMaster}");
+ .log("${routeId} (1) - master is: ${header.ClusterMaster}");
+ fromF("timer:%s-2?period=5s", id)
+ .routeId(id + "-2")
+ .routePolicy(policy)
+ .setHeader("ClusterMaster")
+ .body(b -> view.getMaster().getId())
+ .log("${routeId} (2) - master is: ${header.ClusterMaster}");
}
});
context.start();
latch.await();
context.stop();
+
+ LATCH.countDown();
} catch (Exception e) {
throw new RuntimeException(e);
}
+
+ LOGGER.info("Done {}", address);
}
}