You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2017/10/12 16:54:40 UTC

[3/3] camel git commit: CAMEL-11899: cluster-service : fire event on listener registration

CAMEL-11899: cluster-service : fire event on listener registration


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6682a356
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6682a356
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6682a356

Branch: refs/heads/master
Commit: 6682a35622ec3b7546f3337bce0255564b641334
Parents: a232ab0
Author: lburgazzoli <lb...@gmail.com>
Authored: Thu Oct 12 17:10:39 2017 +0200
Committer: lburgazzoli <lb...@gmail.com>
Committed: Thu Oct 12 18:54:08 2017 +0200

----------------------------------------------------------------------
 .../camel/impl/ha/AbstractCamelClusterView.java | 32 +++++++++++++++++++-
 .../camel/impl/ha/ClusteredRoutePolicy.java     |  1 -
 .../camel/impl/ha/ClusterServiceViewTest.java   | 32 ++++++++++++++++++++
 components/camel-consul/pom.xml                 |  2 +-
 .../camel/component/master/MasterConsumer.java  |  4 ---
 .../zookeeper/ha/ZooKeeperClusterView.java      |  7 +++++
 .../src/test/resources/log4j2.properties        |  4 ++-
 7 files changed, 74 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/6682a356/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java b/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java
index de91976..a6d3e25 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java
@@ -66,11 +66,41 @@ public abstract class AbstractCamelClusterView extends ServiceSupport implements
 
     @Override
     public void addEventListener(CamelClusterEventListener listener) {
-        LockHelper.doWithWriteLock(lock, () -> listeners.add(listener));
+        if (listener == null) {
+            return;
+        }
+
+        LockHelper.doWithWriteLock(
+            lock,
+            () -> {
+                listeners.add(listener);
+
+                if (isRunAllowed()) {
+                    // if the view has already been started, fire known events so
+                    // the consumer can catch up.
+
+                    if (CamelClusterEventListener.Leadership.class.isInstance(listener)) {
+                        CamelClusterEventListener.Leadership.class.cast(listener).leadershipChanged(this, getMaster());
+                    }
+
+                    if (CamelClusterEventListener.Membership.class.isInstance(listener)) {
+                        CamelClusterEventListener.Membership ml = CamelClusterEventListener.Membership.class.cast(listener);
+
+                        for (CamelClusterMember member: getMembers()) {
+                            ml.memberAdded(this, member);
+                        }
+                    }
+                }
+            }
+        );
     }
 
     @Override
     public void removeEventListener(CamelClusterEventListener listener) {
+        if (listener == null) {
+            return;
+        }
+
         LockHelper.doWithWriteLock(lock, () -> listeners.removeIf(l -> l == listener));
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/6682a356/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java
index b537ca7..a733394 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java
@@ -283,7 +283,6 @@ public final class ClusteredRoutePolicy extends RoutePolicySupport implements Ca
         );
 
         clusterView.addEventListener(leadershipEventListener);
-        setLeader(clusterView.getLocalMember().isLeader());
     }
 
     // ****************************************************

http://git-wip-us.apache.org/repos/asf/camel/blob/6682a356/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceViewTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceViewTest.java b/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceViewTest.java
index 0948719..2a6cddd 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceViewTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceViewTest.java
@@ -146,6 +146,38 @@ public class ClusterServiceViewTest {
         );
     }
 
+    @Test
+    public void testLateViewListeners() throws Exception {
+        final TestClusterService service = new TestClusterService(UUID.randomUUID().toString());
+        final TestClusterView view = service.getView("ns1").unwrap(TestClusterView.class);
+        final int events = 1 + new Random().nextInt(10);
+        final Set<Integer> results = new HashSet<>();
+        final CountDownLatch latch = new CountDownLatch(events * 2);
+
+        IntStream.range(0, events).forEach(
+            i -> view.addEventListener((CamelClusterEventListener.Leadership) (v, l) -> {
+                results.add(i);
+                latch.countDown();
+            })
+        );
+
+        service.start();
+        view.setLeader(true);
+
+        IntStream.range(events, events * 2).forEach(
+            i -> view.addEventListener((CamelClusterEventListener.Leadership) (v, l) -> {
+                results.add(i);
+                latch.countDown();
+            })
+        );
+
+        latch.await(10, TimeUnit.SECONDS);
+
+        IntStream.range(0, events * 2).forEach(
+            i -> Assert.assertTrue(results.contains(i))
+        );
+    }
+
     // *********************************
     // Helpers
     // *********************************

http://git-wip-us.apache.org/repos/asf/camel/blob/6682a356/components/camel-consul/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-consul/pom.xml b/components/camel-consul/pom.xml
index f84e750..9d92d85 100644
--- a/components/camel-consul/pom.xml
+++ b/components/camel-consul/pom.xml
@@ -174,7 +174,7 @@
                           <port>consul.port:8500</port>
                         </ports>
                         <wait>
-                          <log>agent: Synced service 'consul'</log>
+                          <log>agent: Synced node info</log>
                           <time>20000</time>
                         </wait>
                         <cmd>

http://git-wip-us.apache.org/repos/asf/camel/blob/6682a356/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
index c7edd91..8d3815f 100644
--- a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
+++ b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
@@ -69,10 +69,6 @@ public class MasterConsumer extends DefaultConsumer {
 
         view = clusterService.getView(masterEndpoint.getNamespace());
         view.addEventListener(leadershipListener);
-
-        if (isMaster()) {
-            onLeadershipTaken();
-        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/6682a356/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java
index 6dab19e..3d313ab 100644
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java
@@ -31,6 +31,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.leader.LeaderSelector;
 import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
 import org.apache.curator.framework.recipes.leader.Participant;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,6 +68,9 @@ final class ZooKeeperClusterView extends AbstractCamelClusterView {
             return ObjectHelper.equal(participant.getId(), localMember.getId())
                 ? Optional.of(localMember)
                 : Optional.of(new CuratorClusterMember(participant));
+        } catch (KeeperException.NoNodeException e) {
+            LOGGER.debug("Failed to get get master because node '{}' does not yet exist (error: '{}')", configuration.getBasePath(), e.getMessage());
+            return Optional.empty();
         } catch (Exception e) {
             throw new RuntimeCamelException(e);
         }
@@ -83,6 +87,9 @@ final class ZooKeeperClusterView extends AbstractCamelClusterView {
                 .stream()
                 .map(CuratorClusterMember::new)
                 .collect(Collectors.toList());
+        } catch (KeeperException.NoNodeException e) {
+            LOGGER.debug("Failed to get members because node '{}' does not yet exist (error: '{}')", configuration.getBasePath(), e.getMessage());
+            return Collections.emptyList();
         } catch (Exception e) {
             throw new RuntimeCamelException(e);
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/6682a356/components/camel-zookeeper/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/test/resources/log4j2.properties b/components/camel-zookeeper/src/test/resources/log4j2.properties
index 1dda6f0..5318521 100644
--- a/components/camel-zookeeper/src/test/resources/log4j2.properties
+++ b/components/camel-zookeeper/src/test/resources/log4j2.properties
@@ -35,6 +35,8 @@ logger.camel-zookeeper.name = org.apache.camel.component.zookeeper
 logger.camel-zookeeper.level = INFO
 logger.camel-zookeeper-policy.name = org.apache.camel.component.zookeeper.policy
 logger.camel-zookeeper-policy.level = INFO
+logger.camel-zookeeper-ha.name = org.apache.camel.component.zookeeper.ha
+logger.camel-zookeeper-ha.level = DEBUG
 logger.camel-ha.name = org.apache.camel.ha
 logger.camel-ha.level = DEBUG
 logger.camel-impl-ha.name = org.apache.camel.impl.ha
@@ -46,5 +48,5 @@ logger.springframework.name = org.springframework
 logger.springframework.level = WARN
 
 rootLogger.level = INFO
-#rootLogger.appenderRef.stdout.ref = out
+//rootLogger.appenderRef.stdout.ref = out
 rootLogger.appenderRef.file.ref = file