You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gossip.apache.org by ec...@apache.org on 2017/08/23 02:56:04 UTC
incubator-gossip git commit: GOSSIP-38 Multiple async GossipListeners
Repository: incubator-gossip
Updated Branches:
refs/heads/master 968571a56 -> ac8303893
GOSSIP-38 Multiple async GossipListeners
Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/ac830389
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/ac830389
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/ac830389
Branch: refs/heads/master
Commit: ac830389327508688c664a01463d1ddfa3fb6721
Parents: 968571a
Author: pxsalehi <px...@gmail.com>
Authored: Mon Aug 21 11:58:15 2017 +0200
Committer: pxsalehi <px...@gmail.com>
Committed: Mon Aug 21 11:58:15 2017 +0200
----------------------------------------------------------------------
.../java/org/apache/gossip/GossipSettings.java | 2 +-
.../src/main/java/org/apache/gossip/Member.java | 2 +-
.../apache/gossip/manager/GossipManager.java | 7 +-
.../gossip/manager/GossipManagerBuilder.java | 3 +-
.../manager/GossipMemberStateRefresher.java | 50 +++++++--
.../gossip/manager/SimpleActiveGossiper.java | 110 +++++++++++++++++++
.../gossip/manager/SimpleActiveGossipper.java | 110 -------------------
.../gossip/transport/TransportManager.java | 2 +-
8 files changed, 164 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ac830389/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
index 792af85..32c00c9 100644
--- a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
+++ b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
@@ -43,7 +43,7 @@ public class GossipSettings {
private String distribution = "normal";
- private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossipper";
+ private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossiper";
private String transportManagerClass = "org.apache.gossip.transport.udp.UdpTransportManager";
private String protocolManagerClass = "org.apache.gossip.protocol.json.JacksonProtocolManager";
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ac830389/gossip-base/src/main/java/org/apache/gossip/Member.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/Member.java b/gossip-base/src/main/java/org/apache/gossip/Member.java
index d04a7b6..54a6737 100644
--- a/gossip-base/src/main/java/org/apache/gossip/Member.java
+++ b/gossip-base/src/main/java/org/apache/gossip/Member.java
@@ -22,7 +22,7 @@ import java.net.URI;
import java.util.Map;
/**
- * A abstract class representing a gossip member.
+ * An abstract class representing a gossip member.
*
*/
public abstract class Member implements Comparable<Member> {
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ac830389/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
index d839b2e..db442c6 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -185,7 +185,7 @@ public abstract class GossipManager {
if (settings.isPersistDataState()) {
scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS);
}
- scheduledServiced.scheduleAtFixedRate(memberStateRefresher, 0, 100, TimeUnit.MILLISECONDS);
+ memberStateRefresher.init();
LOGGER.debug("The GossipManager is started.");
}
@@ -224,6 +224,7 @@ public abstract class GossipManager {
gossipCore.shutdown();
transportManager.shutdown();
dataReaper.close();
+ memberStateRefresher.shutdown();
scheduledServiced.shutdown();
try {
scheduledServiced.awaitTermination(1, TimeUnit.SECONDS);
@@ -366,4 +367,8 @@ public abstract class GossipManager {
public void unregisterSharedDataSubscriber(UpdateSharedDataEventHandler handler){
gossipCore.unregisterSharedDataSubscriber(handler);
}
+
+ public void registerGossipListener(GossipListener listener) {
+ memberStateRefresher.register(listener);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ac830389/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
index 86dca57..f3ca23a 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
@@ -18,10 +18,11 @@
package org.apache.gossip.manager;
import com.codahale.metrics.MetricRegistry;
-import org.apache.gossip.Member;
import org.apache.gossip.GossipSettings;
+import org.apache.gossip.Member;
import org.apache.gossip.StartupSettings;
import org.apache.gossip.event.GossipListener;
+import org.apache.gossip.event.GossipState;
import org.apache.gossip.manager.handlers.MessageHandler;
import org.apache.gossip.manager.handlers.MessageHandlerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ac830389/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java
index 1836309..652bf5c 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java
@@ -26,27 +26,40 @@ import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.model.ShutdownMessage;
import org.apache.log4j.Logger;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.util.function.BiFunction;
-public class GossipMemberStateRefresher implements Runnable {
+public class GossipMemberStateRefresher {
public static final Logger LOGGER = Logger.getLogger(GossipMemberStateRefresher.class);
private final Map<LocalMember, GossipState> members;
private final GossipSettings settings;
- private final GossipListener listener;
+ private final List<GossipListener> listeners = new CopyOnWriteArrayList<>();
private final Clock clock;
private final BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData;
+ private final ExecutorService listenerExecutor;
+ private final ScheduledExecutorService scheduledExecutor;
+ private final BlockingQueue<Runnable> workQueue;
public GossipMemberStateRefresher(Map<LocalMember, GossipState> members, GossipSettings settings,
- GossipListener listener, BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData) {
+ GossipListener listener,
+ BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData) {
this.members = members;
this.settings = settings;
- this.listener = listener;
+ listeners.add(listener);
this.findPerNodeGossipData = findPerNodeGossipData;
clock = new SystemClock();
+ workQueue = new ArrayBlockingQueue<>(1024);
+ listenerExecutor = new ThreadPoolExecutor(1, 20, 1, TimeUnit.SECONDS, workQueue,
+ new ThreadPoolExecutor.DiscardOldestPolicy());
+ scheduledExecutor = Executors.newScheduledThreadPool(1);
+ }
+
+ public void init() {
+ scheduledExecutor.scheduleAtFixedRate(() -> run(), 0, 100, TimeUnit.MILLISECONDS);
}
public void run() {
@@ -74,7 +87,9 @@ public class GossipMemberStateRefresher implements Runnable {
if (entry.getValue() != requiredState) {
members.put(entry.getKey(), requiredState);
- listener.gossipEvent(entry.getKey(), requiredState);
+ /* Call listeners asynchronously */
+ for (GossipListener listener: listeners)
+ listenerExecutor.execute(() -> listener.gossipEvent(entry.getKey(), requiredState));
}
}
}
@@ -112,10 +127,31 @@ public class GossipMemberStateRefresher implements Runnable {
if (s.getShutdownAtNanos() > l.getKey().getHeartbeat()) {
members.put(l.getKey(), GossipState.DOWN);
if (l.getValue() == GossipState.UP) {
- listener.gossipEvent(l.getKey(), GossipState.DOWN);
+ for (GossipListener listener: listeners)
+ listenerExecutor.execute(() -> listener.gossipEvent(l.getKey(), GossipState.DOWN));
}
return true;
}
return false;
}
+
+ public void register(GossipListener listener) {
+ listeners.add(listener);
+ }
+
+ public void shutdown() {
+ scheduledExecutor.shutdown();
+ try {
+ scheduledExecutor.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.debug("Issue during shutdown", e);
+ }
+ listenerExecutor.shutdown();
+ try {
+ listenerExecutor.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.debug("Issue during shutdown", e);
+ }
+ listenerExecutor.shutdownNow();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ac830389/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossiper.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossiper.java b/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossiper.java
new file mode 100644
index 0000000..7d498b4
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossiper.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.gossip.LocalMember;
+
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * Base implementation gossips randomly to live nodes periodically gossips to dead ones
+ *
+ */
+public class SimpleActiveGossiper extends AbstractActiveGossiper {
+
+ private ScheduledExecutorService scheduledExecutorService;
+ private final BlockingQueue<Runnable> workQueue;
+ private ThreadPoolExecutor threadService;
+
+ public SimpleActiveGossiper(GossipManager gossipManager, GossipCore gossipCore,
+ MetricRegistry registry) {
+ super(gossipManager, gossipCore, registry);
+ scheduledExecutorService = Executors.newScheduledThreadPool(2);
+ workQueue = new ArrayBlockingQueue<Runnable>(1024);
+ threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue,
+ new ThreadPoolExecutor.DiscardOldestPolicy());
+ }
+
+ @Override
+ public void init() {
+ super.init();
+ scheduledExecutorService.scheduleAtFixedRate(() -> {
+ threadService.execute(() -> {
+ sendToALiveMember();
+ });
+ }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
+ scheduledExecutorService.scheduleAtFixedRate(() -> {
+ sendToDeadMember();
+ }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
+ scheduledExecutorService.scheduleAtFixedRate(
+ () -> sendPerNodeData(gossipManager.getMyself(),
+ selectPartner(gossipManager.getLiveMembers())),
+ 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
+ scheduledExecutorService.scheduleAtFixedRate(
+ () -> sendSharedData(gossipManager.getMyself(),
+ selectPartner(gossipManager.getLiveMembers())),
+ 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ scheduledExecutorService.shutdown();
+ try {
+ scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.debug("Issue during shutdown", e);
+ }
+ sendShutdownMessage();
+ threadService.shutdown();
+ try {
+ threadService.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.debug("Issue during shutdown", e);
+ }
+ }
+
+ protected void sendToALiveMember(){
+ LocalMember member = selectPartner(gossipManager.getLiveMembers());
+ sendMembershipList(gossipManager.getMyself(), member);
+ }
+
+ protected void sendToDeadMember(){
+ LocalMember member = selectPartner(gossipManager.getDeadMembers());
+ sendMembershipList(gossipManager.getMyself(), member);
+ }
+
+ /**
+ * sends an optimistic shutdown message to several clusters nodes
+ */
+ protected void sendShutdownMessage(){
+ List<LocalMember> l = gossipManager.getLiveMembers();
+ int sendTo = l.size() < 3 ? 1 : l.size() / 2;
+ for (int i = 0; i < sendTo; i++) {
+ threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l)));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ac830389/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java b/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java
deleted file mode 100644
index e47fe2a..0000000
--- a/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.manager;
-
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.gossip.LocalMember;
-
-import com.codahale.metrics.MetricRegistry;
-
-/**
- * Base implementation gossips randomly to live nodes periodically gossips to dead ones
- *
- */
-public class SimpleActiveGossipper extends AbstractActiveGossiper {
-
- private ScheduledExecutorService scheduledExecutorService;
- private final BlockingQueue<Runnable> workQueue;
- private ThreadPoolExecutor threadService;
-
- public SimpleActiveGossipper(GossipManager gossipManager, GossipCore gossipCore,
- MetricRegistry registry) {
- super(gossipManager, gossipCore, registry);
- scheduledExecutorService = Executors.newScheduledThreadPool(2);
- workQueue = new ArrayBlockingQueue<Runnable>(1024);
- threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue,
- new ThreadPoolExecutor.DiscardOldestPolicy());
- }
-
- @Override
- public void init() {
- super.init();
- scheduledExecutorService.scheduleAtFixedRate(() -> {
- threadService.execute(() -> {
- sendToALiveMember();
- });
- }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
- scheduledExecutorService.scheduleAtFixedRate(() -> {
- sendToDeadMember();
- }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
- scheduledExecutorService.scheduleAtFixedRate(
- () -> sendPerNodeData(gossipManager.getMyself(),
- selectPartner(gossipManager.getLiveMembers())),
- 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
- scheduledExecutorService.scheduleAtFixedRate(
- () -> sendSharedData(gossipManager.getMyself(),
- selectPartner(gossipManager.getLiveMembers())),
- 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void shutdown() {
- super.shutdown();
- scheduledExecutorService.shutdown();
- try {
- scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- LOGGER.debug("Issue during shutdown", e);
- }
- sendShutdownMessage();
- threadService.shutdown();
- try {
- threadService.awaitTermination(5, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- LOGGER.debug("Issue during shutdown", e);
- }
- }
-
- protected void sendToALiveMember(){
- LocalMember member = selectPartner(gossipManager.getLiveMembers());
- sendMembershipList(gossipManager.getMyself(), member);
- }
-
- protected void sendToDeadMember(){
- LocalMember member = selectPartner(gossipManager.getDeadMembers());
- sendMembershipList(gossipManager.getMyself(), member);
- }
-
- /**
- * sends an optimistic shutdown message to several clusters nodes
- */
- protected void sendShutdownMessage(){
- List<LocalMember> l = gossipManager.getLiveMembers();
- int sendTo = l.size() < 3 ? 1 : l.size() / 2;
- for (int i = 0; i < sendTo; i++) {
- threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l)));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ac830389/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java b/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java
index 031d90e..99354d1 100644
--- a/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java
+++ b/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java
@@ -20,7 +20,7 @@ package org.apache.gossip.transport;
import java.io.IOException;
import java.net.URI;
-/** interface for manage that sends and receives messages that have already been serialized. */
+/** interface for manager that sends and receives messages that have already been serialized. */
public interface TransportManager {
/** starts the active gossip thread responsible for reaching out to remote nodes. Not related to `startEndpoint()` */