You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gossip.apache.org by ec...@apache.org on 2016/09/24 02:14:19 UTC

incubator-gossip git commit: GOSSIP-15 avoid busy loop (ChiaHung Lin via egc)

Repository: incubator-gossip
Updated Branches:
  refs/heads/master 9a1af76df -> 375cee2a3


GOSSIP-15 avoid busy loop (ChiaHung Lin via egc)


Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/375cee2a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/375cee2a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/375cee2a

Branch: refs/heads/master
Commit: 375cee2a3b899931e3aa5372da1199f49484b44c
Parents: 9a1af76
Author: Edward Capriolo <ed...@gmail.com>
Authored: Fri Sep 23 22:13:40 2016 -0400
Committer: Edward Capriolo <ed...@gmail.com>
Committed: Fri Sep 23 22:13:40 2016 -0400

----------------------------------------------------------------------
 .../java/org/apache/gossip/GossipService.java   |   4 +-
 .../gossip/manager/ActiveGossipThread.java      | 124 ++++++++++++++-----
 .../apache/gossip/manager/GossipManager.java    |  17 +--
 .../random/RandomActiveGossipThread.java        | 120 ------------------
 4 files changed, 99 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/375cee2a/src/main/java/org/apache/gossip/GossipService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java
index ce15992..68a4ca2 100644
--- a/src/main/java/org/apache/gossip/GossipService.java
+++ b/src/main/java/org/apache/gossip/GossipService.java
@@ -70,8 +70,8 @@ public class GossipService {
   }
 
   public void start() {
-    LOGGER.debug("Starting: " + gossipManager.getName() + " - " + get_gossipManager().getMyself().getUri());
-    gossipManager.start();
+    LOGGER.debug("Starting: " + get_gossipManager().getMyself().getUri());
+    gossipManager.init();
   }
 
   public void shutdown() {

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/375cee2a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
index 181d9ae..b57c25a 100644
--- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
@@ -17,59 +17,103 @@
  */
 package org.apache.gossip.manager;
 
+import java.io.IOException;
+import java.net.DatagramSocket;
 import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.gossip.GossipService;
 import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.model.ActiveGossipOk;
+import org.apache.gossip.model.GossipMember;
+import org.apache.gossip.model.Response;
+import org.apache.gossip.udp.UdpActiveGossipMessage;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
 
 /**
  * [The active thread: periodically send gossip request.] The class handles gossiping the membership
  * list. This information is important to maintaining a common state among all the nodes, and is
  * important for detecting failures.
  */
-abstract public class ActiveGossipThread implements Runnable {
+public class ActiveGossipThread {
 
+  public static final Logger LOGGER = Logger.getLogger(ActiveGossipThread.class);
+  
+  private ScheduledExecutorService scheduledExecutorService ;
+  private ObjectMapper MAPPER = new ObjectMapper();
+  private final Random random;
   protected final GossipManager gossipManager;
+  private final GossipCore gossipCore;
 
-  private final AtomicBoolean keepRunning;
-
-  public ActiveGossipThread(GossipManager gossipManager) {
+  public ActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
     this.gossipManager = gossipManager;
-    this.keepRunning = new AtomicBoolean(true);
+    this.scheduledExecutorService = Executors.newScheduledThreadPool(1024);
+    this.gossipCore = gossipCore;
+    this.random = new Random();
   }
 
-  @Override
-  public void run() {
-    while (keepRunning.get()) {
-      try {
-        TimeUnit.MILLISECONDS.sleep(gossipManager.getSettings().getGossipInterval());
-        
-        // contact a live member.
-        sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers());
-        
-        // contact a dead member.
-        sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers());
-        
-      } catch (InterruptedException e) {
-        GossipService.LOGGER.error(e);
-        keepRunning.set(false);
-      }
-    }
-    shutdown();
+  public void init() {
+    scheduledExecutorService.scheduleAtFixedRate(
+            () -> sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
+            gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
+    scheduledExecutorService.scheduleAtFixedRate(
+            () -> sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()), 0,
+            gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
   }
-
+  
   public void shutdown() {
-    keepRunning.set(false);
+    this.scheduledExecutorService.shutdown();
+    try {
+      this.scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.warn("Did not complete shutdown", e);
+    }
   }
 
   /**
    * Performs the sending of the membership list, after we have incremented our own heartbeat.
    */
-  abstract protected void sendMembershipList(LocalGossipMember me,
-          List<LocalGossipMember> memberList);
-
+ protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
+    
+    me.setHeartbeat(System.currentTimeMillis());
+    LocalGossipMember member = selectPartner(memberList);
+    if (member == null) {
+      GossipService.LOGGER.debug("Send sendMembershipList() is called without action");
+      return;
+    } else {
+      GossipService.LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
+    }
+    
+    try (DatagramSocket socket = new DatagramSocket()) {
+      socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
+      UdpActiveGossipMessage message = new UdpActiveGossipMessage();
+      message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
+      message.setUuid(UUID.randomUUID().toString());
+      message.getMembers().add(convert(me));
+      for (LocalGossipMember other : memberList) {
+        message.getMembers().add(convert(other));
+      }
+      byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
+      int packet_length = json_bytes.length;
+      if (packet_length < GossipManager.MAX_PACKET_SIZE) {
+        Response r = gossipCore.send(message, member.getUri());
+        if (r instanceof ActiveGossipOk){
+          //maybe count metrics here
+        } else {
+          LOGGER.warn("Message "+ message + " generated response "+ r);
+        }
+      } else {
+        GossipService.LOGGER.error("The length of the to be send message is too large ("
+                + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
+      }
+    } catch (IOException e1) {
+      GossipService.LOGGER.warn(e1);
+    }
+  }
   /**
    * Abstract method which should be implemented by a subclass. This method should return a member
    * of the list to gossip with.
@@ -78,5 +122,23 @@ abstract public class ActiveGossipThread implements Runnable {
    *          The list of members which are stored in the local list of members.
    * @return The chosen LocalGossipMember to gossip with.
    */
-  abstract protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList);
+ protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
+   LocalGossipMember member = null;
+   if (memberList.size() > 0) {
+     int randomNeighborIndex = random.nextInt(memberList.size());
+     member = memberList.get(randomNeighborIndex);
+   } else {
+     LOGGER.debug("I am alone in this world.");
+   }
+   return member;
+ }
+  
+  private GossipMember convert(LocalGossipMember member){
+    GossipMember gm = new GossipMember();
+    gm.setCluster(member.getClusterName());
+    gm.setHeartbeat(member.getHeartbeat());
+    gm.setUri(member.getUri().toASCIIString());
+    gm.setId(member.getId());
+    return gm;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/375cee2a/src/main/java/org/apache/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index 79be431..0b2cfd2 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -40,9 +40,8 @@ import org.apache.gossip.LocalGossipMember;
 import org.apache.gossip.event.GossipListener;
 import org.apache.gossip.event.GossipState;
 import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
-import org.apache.gossip.manager.random.RandomActiveGossipThread;
 
-public abstract class GossipManager extends Thread implements NotificationListener {
+public abstract class GossipManager implements NotificationListener {
 
   public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
 
@@ -179,7 +178,7 @@ public abstract class GossipManager extends Thread implements NotificationListen
    * Starts the client. Specifically, start the various cycles for this protocol. Start the gossip
    * thread and start the receiver thread.
    */
-  public void run() {
+  public void init() {
     for (LocalGossipMember member : members.keySet()) {
       if (member != me) {
         member.startTimeoutTimer();
@@ -187,17 +186,9 @@ public abstract class GossipManager extends Thread implements NotificationListen
     }
     passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, gossipCore);
     gossipThreadExecutor.execute(passiveGossipThread);
-    activeGossipThread = new RandomActiveGossipThread(this, this.gossipCore);
-    gossipThreadExecutor.execute(activeGossipThread);
+    activeGossipThread = new ActiveGossipThread(this, this.gossipCore);
+    activeGossipThread.init();
     GossipService.LOGGER.debug("The GossipService is started.");
-    while (gossipServiceRunning.get()) {
-      try {
-        // TODO
-        TimeUnit.MILLISECONDS.sleep(1);
-      } catch (InterruptedException e) {
-        GossipService.LOGGER.warn("The GossipClient was interrupted.");
-      }
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/375cee2a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java
deleted file mode 100644
index 03d550c..0000000
--- a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.manager.random;
-
-import java.io.IOException;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Random;
-import java.util.UUID;
-
-import org.apache.gossip.GossipService;
-import org.apache.gossip.LocalGossipMember;
-import org.apache.gossip.manager.ActiveGossipThread;
-import org.apache.gossip.manager.GossipCore;
-import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.model.ActiveGossipOk;
-import org.apache.gossip.model.GossipMember;
-import org.apache.gossip.model.Response;
-import org.apache.gossip.udp.UdpActiveGossipMessage;
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.ObjectMapper;
-
-public class RandomActiveGossipThread extends ActiveGossipThread {
-
-  public static final Logger LOGGER = Logger.getLogger(RandomActiveGossipThread.class);
-  protected ObjectMapper MAPPER = new ObjectMapper();
-  
-  /** The Random used for choosing a member to gossip with. */
-  private final Random random;
-  private final GossipCore gossipCore;
-
-  public RandomActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
-    super(gossipManager);
-    random = new Random();
-    this.gossipCore = gossipCore;
-  }
-
-  /**
-   * [The selectToSend() function.] Find a random peer from the local membership list. In the case
-   * where this client is the only member in the list, this method will return null.
-   * 
-   * @return Member random member if list is greater than 1, null otherwise
-   */
-  protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
-    LocalGossipMember member = null;
-    if (memberList.size() > 0) {
-      int randomNeighborIndex = random.nextInt(memberList.size());
-      member = memberList.get(randomNeighborIndex);
-    } else {
-      GossipService.LOGGER.debug("I am alone in this world.");
-      
-    }
-    return member;
-  }
-
-  protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
-    
-    me.setHeartbeat(System.currentTimeMillis());
-    LocalGossipMember member = selectPartner(memberList);
-    if (member == null) {
-      GossipService.LOGGER.debug("Send sendMembershipList() is called without action");
-      return;
-    } else {
-      GossipService.LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
-    }
-    
-    try (DatagramSocket socket = new DatagramSocket()) {
-      socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
-      UdpActiveGossipMessage message = new UdpActiveGossipMessage();
-      message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
-      message.setUuid(UUID.randomUUID().toString());
-      message.getMembers().add(convert(me));
-      for (LocalGossipMember other : memberList) {
-        message.getMembers().add(convert(other));
-      }
-      byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
-      int packet_length = json_bytes.length;
-      if (packet_length < GossipManager.MAX_PACKET_SIZE) {
-        Response r = gossipCore.send(message, member.getUri());
-        if (r instanceof ActiveGossipOk){
-          //maybe count metrics here
-        } else {
-          LOGGER.warn("Message "+ message + " generated response "+ r);
-        }
-      } else {
-        GossipService.LOGGER.error("The length of the to be send message is too large ("
-                + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
-      }
-    } catch (IOException e1) {
-      GossipService.LOGGER.warn(e1);
-    }
-  }
-  
-  private GossipMember convert(LocalGossipMember member){
-    GossipMember gm = new GossipMember();
-    gm.setCluster(member.getClusterName());
-    gm.setHeartbeat(member.getHeartbeat());
-    gm.setUri(member.getUri().toASCIIString());
-    gm.setId(member.getId());
-    return gm;
-  }
-  
-}