You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2017/06/16 15:39:04 UTC

[03/13] camel git commit: CAMEL-10054: Create camel-atomix component

CAMEL-10054: Create camel-atomix component


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/49076653
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/49076653
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/49076653

Branch: refs/heads/master
Commit: 490766539527f823a32cb4aaffa46005956835c7
Parents: b6f1bdd
Author: lburgazzoli <lb...@gmail.com>
Authored: Thu Jun 1 12:27:37 2017 +0200
Committer: lburgazzoli <lb...@gmail.com>
Committed: Fri Jun 16 17:37:53 2017 +0200

----------------------------------------------------------------------
 .../atomix/ha/AtomixRoutePolicyMain.java        | 49 +++++++++++++-------
 1 file changed, 32 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/49076653/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyMain.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyMain.java b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyMain.java
index 78ef149..0a126d3 100644
--- a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyMain.java
+++ b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyMain.java
@@ -36,6 +36,7 @@ import org.apache.camel.ha.CamelCluster;
 import org.apache.camel.ha.CamelClusterView;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.impl.ha.ClusteredRoutePolicy;
+import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.test.AvailablePortFinder;
 import org.apache.camel.util.FileUtil;
 import org.slf4j.Logger;
@@ -44,23 +45,27 @@ import org.slf4j.LoggerFactory;
 public final class AtomixRoutePolicyMain {
     private static final Logger LOGGER = LoggerFactory.getLogger(AtomixRoutePolicyMain.class);
 
+
     private static final List<Address> ADDRESSES =  Arrays.asList(
         new Address("127.0.0.1", AvailablePortFinder.getNextAvailable()),
         new Address("127.0.0.1", AvailablePortFinder.getNextAvailable()),
         new Address("127.0.0.1", AvailablePortFinder.getNextAvailable())
     );
 
-    private static final ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(ADDRESSES.size() * 3);
+    private static final CountDownLatch LATCH = new CountDownLatch(ADDRESSES.size());
+    private static final ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(ADDRESSES.size() * 2);
 
     public static void main(final String[] args) throws Exception {
         for (Address address :  ADDRESSES) {
-            EXECUTOR.submit(() -> setupContext(address));
+            EXECUTOR.submit(() -> run(address));
         }
 
-        EXECUTOR.awaitTermination(5, TimeUnit.MINUTES);
+        LATCH.await();
+
+        System.exit(0);
     }
 
-    static void setupContext(Address address) {
+    static void run(Address address) {
         try {
             final String id = String.format("atomix-%d", address.port());
             final File path = new File("target", id);
@@ -73,7 +78,7 @@ public final class AtomixRoutePolicyMain {
                 .withStorage(
                     Storage.builder()
                         .withDirectory(path)
-                        .withStorageLevel(StorageLevel.DISK)
+                        .withStorageLevel(StorageLevel.MEMORY)
                         .build())
                 .build()
                 .bootstrap(ADDRESSES)
@@ -86,34 +91,44 @@ public final class AtomixRoutePolicyMain {
 
             view.addEventListener((e, p) -> {
                 if (view.getLocalMember().isMaster()) {
-                    LOGGER.info("{}, is now master", address);
-                    try {
-                        EXECUTOR.schedule(latch::countDown, 10, TimeUnit.SECONDS);
-                    } catch (Exception ex) {
-                        throw new RuntimeException(ex);
-                    }
+                    LOGGER.info("Member {} ({}), is now master", view.getLocalMember().getId(), address);
+
+                    // Shutdown the context later on so the next one should take
+                    // the leadership
+                    EXECUTOR.schedule(latch::countDown, 10, TimeUnit.SECONDS);
                 }
             });
 
-            context.disableJMX();
-            context.addService(cluster, true, true);
+            context.addService(cluster);
             context.addRoutes(new RouteBuilder() {
                 @Override
                 public void configure() throws Exception {
-                    fromF("timer:%s?period=1s", id)
-                        .routeId(id)
-                        .routePolicy(new ClusteredRoutePolicy(view))
+                    RoutePolicy policy = ClusteredRoutePolicy.forView(view);
+
+                    fromF("timer:%s-1?period=2s", id)
+                        .routeId(id + "-1")
+                        .routePolicy(policy)
                         .setHeader("ClusterMaster")
                             .body(b -> view.getMaster().getId())
-                        .log("${routeId} - master is: ${header.ClusterMaster}");
+                        .log("${routeId} (1) - master is: ${header.ClusterMaster}");
+                    fromF("timer:%s-2?period=5s", id)
+                        .routeId(id + "-2")
+                        .routePolicy(policy)
+                        .setHeader("ClusterMaster")
+                            .body(b -> view.getMaster().getId())
+                        .log("${routeId} (2) - master is: ${header.ClusterMaster}");
                 }
             });
 
             context.start();
             latch.await();
             context.stop();
+
+            LATCH.countDown();
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
+
+        LOGGER.info("Done {}", address);
     }
 }