You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2017/10/12 16:54:40 UTC
[3/3] camel git commit: CAMEL-11899: cluster-service : fire event on
listener registration
CAMEL-11899: cluster-service : fire event on listener registration
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6682a356
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6682a356
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6682a356
Branch: refs/heads/master
Commit: 6682a35622ec3b7546f3337bce0255564b641334
Parents: a232ab0
Author: lburgazzoli <lb...@gmail.com>
Authored: Thu Oct 12 17:10:39 2017 +0200
Committer: lburgazzoli <lb...@gmail.com>
Committed: Thu Oct 12 18:54:08 2017 +0200
----------------------------------------------------------------------
.../camel/impl/ha/AbstractCamelClusterView.java | 32 +++++++++++++++++++-
.../camel/impl/ha/ClusteredRoutePolicy.java | 1 -
.../camel/impl/ha/ClusterServiceViewTest.java | 32 ++++++++++++++++++++
components/camel-consul/pom.xml | 2 +-
.../camel/component/master/MasterConsumer.java | 4 ---
.../zookeeper/ha/ZooKeeperClusterView.java | 7 +++++
.../src/test/resources/log4j2.properties | 4 ++-
7 files changed, 74 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/6682a356/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java b/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java
index de91976..a6d3e25 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java
@@ -66,11 +66,41 @@ public abstract class AbstractCamelClusterView extends ServiceSupport implements
@Override
public void addEventListener(CamelClusterEventListener listener) {
- LockHelper.doWithWriteLock(lock, () -> listeners.add(listener));
+ if (listener == null) {
+ return;
+ }
+
+ LockHelper.doWithWriteLock(
+ lock,
+ () -> {
+ listeners.add(listener);
+
+ if (isRunAllowed()) {
+ // if the view has already been started, fire known events so
+ // the consumer can catch up.
+
+ if (CamelClusterEventListener.Leadership.class.isInstance(listener)) {
+ CamelClusterEventListener.Leadership.class.cast(listener).leadershipChanged(this, getMaster());
+ }
+
+ if (CamelClusterEventListener.Membership.class.isInstance(listener)) {
+ CamelClusterEventListener.Membership ml = CamelClusterEventListener.Membership.class.cast(listener);
+
+ for (CamelClusterMember member: getMembers()) {
+ ml.memberAdded(this, member);
+ }
+ }
+ }
+ }
+ );
}
@Override
public void removeEventListener(CamelClusterEventListener listener) {
+ if (listener == null) {
+ return;
+ }
+
LockHelper.doWithWriteLock(lock, () -> listeners.removeIf(l -> l == listener));
}
http://git-wip-us.apache.org/repos/asf/camel/blob/6682a356/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java
index b537ca7..a733394 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java
@@ -283,7 +283,6 @@ public final class ClusteredRoutePolicy extends RoutePolicySupport implements Ca
);
clusterView.addEventListener(leadershipEventListener);
- setLeader(clusterView.getLocalMember().isLeader());
}
// ****************************************************
http://git-wip-us.apache.org/repos/asf/camel/blob/6682a356/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceViewTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceViewTest.java b/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceViewTest.java
index 0948719..2a6cddd 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceViewTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/ha/ClusterServiceViewTest.java
@@ -146,6 +146,38 @@ public class ClusterServiceViewTest {
);
}
+ @Test
+ public void testLateViewListeners() throws Exception {
+ final TestClusterService service = new TestClusterService(UUID.randomUUID().toString());
+ final TestClusterView view = service.getView("ns1").unwrap(TestClusterView.class);
+ final int events = 1 + new Random().nextInt(10);
+ final Set<Integer> results = new HashSet<>();
+ final CountDownLatch latch = new CountDownLatch(events * 2);
+
+ IntStream.range(0, events).forEach(
+ i -> view.addEventListener((CamelClusterEventListener.Leadership) (v, l) -> {
+ results.add(i);
+ latch.countDown();
+ })
+ );
+
+ service.start();
+ view.setLeader(true);
+
+ IntStream.range(events, events * 2).forEach(
+ i -> view.addEventListener((CamelClusterEventListener.Leadership) (v, l) -> {
+ results.add(i);
+ latch.countDown();
+ })
+ );
+
+ latch.await(10, TimeUnit.SECONDS);
+
+ IntStream.range(0, events * 2).forEach(
+ i -> Assert.assertTrue(results.contains(i))
+ );
+ }
+
// *********************************
// Helpers
// *********************************
http://git-wip-us.apache.org/repos/asf/camel/blob/6682a356/components/camel-consul/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-consul/pom.xml b/components/camel-consul/pom.xml
index f84e750..9d92d85 100644
--- a/components/camel-consul/pom.xml
+++ b/components/camel-consul/pom.xml
@@ -174,7 +174,7 @@
<port>consul.port:8500</port>
</ports>
<wait>
- <log>agent: Synced service 'consul'</log>
+ <log>agent: Synced node info</log>
<time>20000</time>
</wait>
<cmd>
http://git-wip-us.apache.org/repos/asf/camel/blob/6682a356/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
index c7edd91..8d3815f 100644
--- a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
+++ b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
@@ -69,10 +69,6 @@ public class MasterConsumer extends DefaultConsumer {
view = clusterService.getView(masterEndpoint.getNamespace());
view.addEventListener(leadershipListener);
-
- if (isMaster()) {
- onLeadershipTaken();
- }
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/6682a356/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java
index 6dab19e..3d313ab 100644
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ha/ZooKeeperClusterView.java
@@ -31,6 +31,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.framework.recipes.leader.Participant;
+import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,6 +68,9 @@ final class ZooKeeperClusterView extends AbstractCamelClusterView {
return ObjectHelper.equal(participant.getId(), localMember.getId())
? Optional.of(localMember)
: Optional.of(new CuratorClusterMember(participant));
+ } catch (KeeperException.NoNodeException e) {
+ LOGGER.debug("Failed to get get master because node '{}' does not yet exist (error: '{}')", configuration.getBasePath(), e.getMessage());
+ return Optional.empty();
} catch (Exception e) {
throw new RuntimeCamelException(e);
}
@@ -83,6 +87,9 @@ final class ZooKeeperClusterView extends AbstractCamelClusterView {
.stream()
.map(CuratorClusterMember::new)
.collect(Collectors.toList());
+ } catch (KeeperException.NoNodeException e) {
+ LOGGER.debug("Failed to get members because node '{}' does not yet exist (error: '{}')", configuration.getBasePath(), e.getMessage());
+ return Collections.emptyList();
} catch (Exception e) {
throw new RuntimeCamelException(e);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/6682a356/components/camel-zookeeper/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/test/resources/log4j2.properties b/components/camel-zookeeper/src/test/resources/log4j2.properties
index 1dda6f0..5318521 100644
--- a/components/camel-zookeeper/src/test/resources/log4j2.properties
+++ b/components/camel-zookeeper/src/test/resources/log4j2.properties
@@ -35,6 +35,8 @@ logger.camel-zookeeper.name = org.apache.camel.component.zookeeper
logger.camel-zookeeper.level = INFO
logger.camel-zookeeper-policy.name = org.apache.camel.component.zookeeper.policy
logger.camel-zookeeper-policy.level = INFO
+logger.camel-zookeeper-ha.name = org.apache.camel.component.zookeeper.ha
+logger.camel-zookeeper-ha.level = DEBUG
logger.camel-ha.name = org.apache.camel.ha
logger.camel-ha.level = DEBUG
logger.camel-impl-ha.name = org.apache.camel.impl.ha
@@ -46,5 +48,5 @@ logger.springframework.name = org.springframework
logger.springframework.level = WARN
rootLogger.level = INFO
-#rootLogger.appenderRef.stdout.ref = out
+//rootLogger.appenderRef.stdout.ref = out
rootLogger.appenderRef.file.ref = file