You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gossip.apache.org by ec...@apache.org on 2017/08/23 02:56:04 UTC

incubator-gossip git commit: GOSSIP-38 Multiple async GossipListeners

Repository: incubator-gossip
Updated Branches:
  refs/heads/master 968571a56 -> ac8303893


GOSSIP-38 Multiple async GossipListeners


Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/ac830389
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/ac830389
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/ac830389

Branch: refs/heads/master
Commit: ac830389327508688c664a01463d1ddfa3fb6721
Parents: 968571a
Author: pxsalehi <px...@gmail.com>
Authored: Mon Aug 21 11:58:15 2017 +0200
Committer: pxsalehi <px...@gmail.com>
Committed: Mon Aug 21 11:58:15 2017 +0200

----------------------------------------------------------------------
 .../java/org/apache/gossip/GossipSettings.java  |   2 +-
 .../src/main/java/org/apache/gossip/Member.java |   2 +-
 .../apache/gossip/manager/GossipManager.java    |   7 +-
 .../gossip/manager/GossipManagerBuilder.java    |   3 +-
 .../manager/GossipMemberStateRefresher.java     |  50 +++++++--
 .../gossip/manager/SimpleActiveGossiper.java    | 110 +++++++++++++++++++
 .../gossip/manager/SimpleActiveGossipper.java   | 110 -------------------
 .../gossip/transport/TransportManager.java      |   2 +-
 8 files changed, 164 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ac830389/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
index 792af85..32c00c9 100644
--- a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
+++ b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java
@@ -43,7 +43,7 @@ public class GossipSettings {
   
   private String distribution = "normal";
   
-  private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossipper";
+  private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossiper";
 
   private String transportManagerClass = "org.apache.gossip.transport.udp.UdpTransportManager";
   private String protocolManagerClass = "org.apache.gossip.protocol.json.JacksonProtocolManager";

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ac830389/gossip-base/src/main/java/org/apache/gossip/Member.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/Member.java b/gossip-base/src/main/java/org/apache/gossip/Member.java
index d04a7b6..54a6737 100644
--- a/gossip-base/src/main/java/org/apache/gossip/Member.java
+++ b/gossip-base/src/main/java/org/apache/gossip/Member.java
@@ -22,7 +22,7 @@ import java.net.URI;
 import java.util.Map;
 
 /**
- * A abstract class representing a gossip member.
+ * An abstract class representing a gossip member.
  * 
  */
 public abstract class Member implements Comparable<Member> {

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ac830389/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
index d839b2e..db442c6 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -185,7 +185,7 @@ public abstract class GossipManager {
     if (settings.isPersistDataState()) {
       scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS);
     }
-    scheduledServiced.scheduleAtFixedRate(memberStateRefresher, 0, 100, TimeUnit.MILLISECONDS);
+    memberStateRefresher.init();
     LOGGER.debug("The GossipManager is started.");
   }
   
@@ -224,6 +224,7 @@ public abstract class GossipManager {
     gossipCore.shutdown();
     transportManager.shutdown();
     dataReaper.close();
+    memberStateRefresher.shutdown();
     scheduledServiced.shutdown();
     try {
       scheduledServiced.awaitTermination(1, TimeUnit.SECONDS);
@@ -366,4 +367,8 @@ public abstract class GossipManager {
   public void unregisterSharedDataSubscriber(UpdateSharedDataEventHandler handler){
     gossipCore.unregisterSharedDataSubscriber(handler);
   }
+
+  public void registerGossipListener(GossipListener listener) {
+    memberStateRefresher.register(listener);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ac830389/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
index 86dca57..f3ca23a 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java
@@ -18,10 +18,11 @@
 package org.apache.gossip.manager;
 
 import com.codahale.metrics.MetricRegistry;
-import org.apache.gossip.Member;
 import org.apache.gossip.GossipSettings;
+import org.apache.gossip.Member;
 import org.apache.gossip.StartupSettings;
 import org.apache.gossip.event.GossipListener;
+import org.apache.gossip.event.GossipState;
 import org.apache.gossip.manager.handlers.MessageHandler;
 import org.apache.gossip.manager.handlers.MessageHandlerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ac830389/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java
index 1836309..652bf5c 100644
--- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java
@@ -26,27 +26,40 @@ import org.apache.gossip.model.PerNodeDataMessage;
 import org.apache.gossip.model.ShutdownMessage;
 import org.apache.log4j.Logger;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.function.BiFunction;
 
-public class GossipMemberStateRefresher implements Runnable {
+public class GossipMemberStateRefresher {
   public static final Logger LOGGER = Logger.getLogger(GossipMemberStateRefresher.class);
 
   private final Map<LocalMember, GossipState> members;
   private final GossipSettings settings;
-  private final GossipListener listener;
+  private final List<GossipListener> listeners = new CopyOnWriteArrayList<>();
   private final Clock clock;
   private final BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData;
+  private final ExecutorService listenerExecutor;
+  private final ScheduledExecutorService scheduledExecutor;
+  private final BlockingQueue<Runnable> workQueue;
 
   public GossipMemberStateRefresher(Map<LocalMember, GossipState> members, GossipSettings settings,
-                                    GossipListener listener, BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData) {
+                                    GossipListener listener,
+                                    BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData) {
     this.members = members;
     this.settings = settings;
-    this.listener = listener;
+    listeners.add(listener);
     this.findPerNodeGossipData = findPerNodeGossipData;
     clock = new SystemClock();
+    workQueue = new ArrayBlockingQueue<>(1024);
+    listenerExecutor = new ThreadPoolExecutor(1, 20, 1, TimeUnit.SECONDS, workQueue,
+            new ThreadPoolExecutor.DiscardOldestPolicy());
+    scheduledExecutor = Executors.newScheduledThreadPool(1);
+  }
+
+  public void init() {
+    scheduledExecutor.scheduleAtFixedRate(() -> run(), 0, 100, TimeUnit.MILLISECONDS);
   }
 
   public void run() {
@@ -74,7 +87,9 @@ public class GossipMemberStateRefresher implements Runnable {
 
       if (entry.getValue() != requiredState) {
         members.put(entry.getKey(), requiredState);
-        listener.gossipEvent(entry.getKey(), requiredState);
+        /* Call listeners asynchronously */
+        for (GossipListener listener: listeners)
+          listenerExecutor.execute(() -> listener.gossipEvent(entry.getKey(), requiredState));
       }
     }
   }
@@ -112,10 +127,31 @@ public class GossipMemberStateRefresher implements Runnable {
     if (s.getShutdownAtNanos() > l.getKey().getHeartbeat()) {
       members.put(l.getKey(), GossipState.DOWN);
       if (l.getValue() == GossipState.UP) {
-        listener.gossipEvent(l.getKey(), GossipState.DOWN);
+        for (GossipListener listener: listeners)
+          listenerExecutor.execute(() -> listener.gossipEvent(l.getKey(), GossipState.DOWN));
       }
       return true;
     }
     return false;
   }
+
+  public void register(GossipListener listener) {
+    listeners.add(listener);
+  }
+
+  public void shutdown() {
+    scheduledExecutor.shutdown();
+    try {
+      scheduledExecutor.awaitTermination(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.debug("Issue during shutdown", e);
+    }
+    listenerExecutor.shutdown();
+    try {
+      listenerExecutor.awaitTermination(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.debug("Issue during shutdown", e);
+    }
+    listenerExecutor.shutdownNow();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ac830389/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossiper.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossiper.java b/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossiper.java
new file mode 100644
index 0000000..7d498b4
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossiper.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.gossip.LocalMember;
+
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * Base implementation gossips randomly to live nodes periodically gossips to dead ones
+ *
+ */
+public class SimpleActiveGossiper extends AbstractActiveGossiper {
+
+  private ScheduledExecutorService scheduledExecutorService;
+  private final BlockingQueue<Runnable> workQueue;
+  private ThreadPoolExecutor threadService;
+  
+  public SimpleActiveGossiper(GossipManager gossipManager, GossipCore gossipCore,
+                              MetricRegistry registry) {
+    super(gossipManager, gossipCore, registry);
+    scheduledExecutorService = Executors.newScheduledThreadPool(2);
+    workQueue = new ArrayBlockingQueue<Runnable>(1024);
+    threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue,
+            new ThreadPoolExecutor.DiscardOldestPolicy());
+  }
+
+  @Override
+  public void init() {
+    super.init();
+    scheduledExecutorService.scheduleAtFixedRate(() -> {
+      threadService.execute(() -> {
+        sendToALiveMember();
+      });
+    }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
+    scheduledExecutorService.scheduleAtFixedRate(() -> {
+      sendToDeadMember();
+    }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
+    scheduledExecutorService.scheduleAtFixedRate(
+            () -> sendPerNodeData(gossipManager.getMyself(),
+                    selectPartner(gossipManager.getLiveMembers())),
+            0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
+    scheduledExecutorService.scheduleAtFixedRate(
+            () -> sendSharedData(gossipManager.getMyself(),
+                    selectPartner(gossipManager.getLiveMembers())),
+            0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
+  }
+  
+  @Override
+  public void shutdown() {
+    super.shutdown();
+    scheduledExecutorService.shutdown();
+    try {
+      scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.debug("Issue during shutdown", e);
+    }
+    sendShutdownMessage();
+    threadService.shutdown();
+    try {
+      threadService.awaitTermination(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.debug("Issue during shutdown", e);
+    }
+  }
+
+  protected void sendToALiveMember(){
+    LocalMember member = selectPartner(gossipManager.getLiveMembers());
+    sendMembershipList(gossipManager.getMyself(), member);
+  }
+  
+  protected void sendToDeadMember(){
+    LocalMember member = selectPartner(gossipManager.getDeadMembers());
+    sendMembershipList(gossipManager.getMyself(), member);
+  }
+  
+  /**
+   * sends an optimistic shutdown message to several clusters nodes
+   */
+  protected void sendShutdownMessage(){
+    List<LocalMember> l = gossipManager.getLiveMembers();
+    int sendTo = l.size() < 3 ? 1 : l.size() / 2;
+    for (int i = 0; i < sendTo; i++) {
+      threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l)));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ac830389/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java b/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java
deleted file mode 100644
index e47fe2a..0000000
--- a/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.manager;
-
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.gossip.LocalMember;
-
-import com.codahale.metrics.MetricRegistry;
-
-/**
- * Base implementation gossips randomly to live nodes periodically gossips to dead ones
- *
- */
-public class SimpleActiveGossipper extends AbstractActiveGossiper {
-
-  private ScheduledExecutorService scheduledExecutorService;
-  private final BlockingQueue<Runnable> workQueue;
-  private ThreadPoolExecutor threadService;
-  
-  public SimpleActiveGossipper(GossipManager gossipManager, GossipCore gossipCore,
-          MetricRegistry registry) {
-    super(gossipManager, gossipCore, registry);
-    scheduledExecutorService = Executors.newScheduledThreadPool(2);
-    workQueue = new ArrayBlockingQueue<Runnable>(1024);
-    threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue,
-            new ThreadPoolExecutor.DiscardOldestPolicy());
-  }
-
-  @Override
-  public void init() {
-    super.init();
-    scheduledExecutorService.scheduleAtFixedRate(() -> {
-      threadService.execute(() -> {
-        sendToALiveMember();
-      });
-    }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
-    scheduledExecutorService.scheduleAtFixedRate(() -> {
-      sendToDeadMember();
-    }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
-    scheduledExecutorService.scheduleAtFixedRate(
-            () -> sendPerNodeData(gossipManager.getMyself(),
-                    selectPartner(gossipManager.getLiveMembers())),
-            0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
-    scheduledExecutorService.scheduleAtFixedRate(
-            () -> sendSharedData(gossipManager.getMyself(),
-                    selectPartner(gossipManager.getLiveMembers())),
-            0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
-  }
-  
-  @Override
-  public void shutdown() {
-    super.shutdown();
-    scheduledExecutorService.shutdown();
-    try {
-      scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      LOGGER.debug("Issue during shutdown", e);
-    }
-    sendShutdownMessage();
-    threadService.shutdown();
-    try {
-      threadService.awaitTermination(5, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      LOGGER.debug("Issue during shutdown", e);
-    }
-  }
-
-  protected void sendToALiveMember(){
-    LocalMember member = selectPartner(gossipManager.getLiveMembers());
-    sendMembershipList(gossipManager.getMyself(), member);
-  }
-  
-  protected void sendToDeadMember(){
-    LocalMember member = selectPartner(gossipManager.getDeadMembers());
-    sendMembershipList(gossipManager.getMyself(), member);
-  }
-  
-  /**
-   * sends an optimistic shutdown message to several clusters nodes
-   */
-  protected void sendShutdownMessage(){
-    List<LocalMember> l = gossipManager.getLiveMembers();
-    int sendTo = l.size() < 3 ? 1 : l.size() / 2;
-    for (int i = 0; i < sendTo; i++) {
-      threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l)));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ac830389/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java
----------------------------------------------------------------------
diff --git a/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java b/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java
index 031d90e..99354d1 100644
--- a/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java
+++ b/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java
@@ -20,7 +20,7 @@ package org.apache.gossip.transport;
 import java.io.IOException;
 import java.net.URI;
 
-/** interface for manage that sends and receives messages that have already been serialized. */
+/** interface for manager that sends and receives messages that have already been serialized. */
 public interface TransportManager {
   
   /** starts the active gossip thread responsible for reaching out to remote nodes. Not related to `startEndpoint()` */