You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gossip.apache.org by ec...@apache.org on 2016/09/24 02:14:19 UTC
incubator-gossip git commit: GOSSIP-15 avoid busy loop (ChiaHung Lin
via egc)
Repository: incubator-gossip
Updated Branches:
refs/heads/master 9a1af76df -> 375cee2a3
GOSSIP-15 avoid busy loop (ChiaHung Lin via egc)
Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/375cee2a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/375cee2a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/375cee2a
Branch: refs/heads/master
Commit: 375cee2a3b899931e3aa5372da1199f49484b44c
Parents: 9a1af76
Author: Edward Capriolo <ed...@gmail.com>
Authored: Fri Sep 23 22:13:40 2016 -0400
Committer: Edward Capriolo <ed...@gmail.com>
Committed: Fri Sep 23 22:13:40 2016 -0400
----------------------------------------------------------------------
.../java/org/apache/gossip/GossipService.java | 4 +-
.../gossip/manager/ActiveGossipThread.java | 124 ++++++++++++++-----
.../apache/gossip/manager/GossipManager.java | 17 +--
.../random/RandomActiveGossipThread.java | 120 ------------------
4 files changed, 99 insertions(+), 166 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/375cee2a/src/main/java/org/apache/gossip/GossipService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java
index ce15992..68a4ca2 100644
--- a/src/main/java/org/apache/gossip/GossipService.java
+++ b/src/main/java/org/apache/gossip/GossipService.java
@@ -70,8 +70,8 @@ public class GossipService {
}
public void start() {
- LOGGER.debug("Starting: " + gossipManager.getName() + " - " + get_gossipManager().getMyself().getUri());
- gossipManager.start();
+ LOGGER.debug("Starting: " + get_gossipManager().getMyself().getUri());
+ gossipManager.init();
}
public void shutdown() {
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/375cee2a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
index 181d9ae..b57c25a 100644
--- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
@@ -17,59 +17,103 @@
*/
package org.apache.gossip.manager;
+import java.io.IOException;
+import java.net.DatagramSocket;
import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.gossip.GossipService;
import org.apache.gossip.LocalGossipMember;
+import org.apache.gossip.model.ActiveGossipOk;
+import org.apache.gossip.model.GossipMember;
+import org.apache.gossip.model.Response;
+import org.apache.gossip.udp.UdpActiveGossipMessage;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
/**
* [The active thread: periodically send gossip request.] The class handles gossiping the membership
* list. This information is important to maintaining a common state among all the nodes, and is
* important for detecting failures.
*/
-abstract public class ActiveGossipThread implements Runnable {
+public class ActiveGossipThread {
+ public static final Logger LOGGER = Logger.getLogger(ActiveGossipThread.class);
+
+ private ScheduledExecutorService scheduledExecutorService ;
+ private ObjectMapper MAPPER = new ObjectMapper();
+ private final Random random;
protected final GossipManager gossipManager;
+ private final GossipCore gossipCore;
- private final AtomicBoolean keepRunning;
-
- public ActiveGossipThread(GossipManager gossipManager) {
+ public ActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
this.gossipManager = gossipManager;
- this.keepRunning = new AtomicBoolean(true);
+ this.scheduledExecutorService = Executors.newScheduledThreadPool(1024);
+ this.gossipCore = gossipCore;
+ this.random = new Random();
}
- @Override
- public void run() {
- while (keepRunning.get()) {
- try {
- TimeUnit.MILLISECONDS.sleep(gossipManager.getSettings().getGossipInterval());
-
- // contact a live member.
- sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers());
-
- // contact a dead member.
- sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers());
-
- } catch (InterruptedException e) {
- GossipService.LOGGER.error(e);
- keepRunning.set(false);
- }
- }
- shutdown();
+ public void init() {
+ scheduledExecutorService.scheduleAtFixedRate(
+ () -> sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
+ gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
+ scheduledExecutorService.scheduleAtFixedRate(
+ () -> sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()), 0,
+ gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
}
-
+
public void shutdown() {
- keepRunning.set(false);
+ this.scheduledExecutorService.shutdown();
+ try {
+ this.scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Did not complete shutdown", e);
+ }
}
/**
* Performs the sending of the membership list, after we have incremented our own heartbeat.
*/
- abstract protected void sendMembershipList(LocalGossipMember me,
- List<LocalGossipMember> memberList);
-
+ protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
+
+ me.setHeartbeat(System.currentTimeMillis());
+ LocalGossipMember member = selectPartner(memberList);
+ if (member == null) {
+ GossipService.LOGGER.debug("Send sendMembershipList() is called without action");
+ return;
+ } else {
+ GossipService.LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
+ }
+
+ try (DatagramSocket socket = new DatagramSocket()) {
+ socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
+ UdpActiveGossipMessage message = new UdpActiveGossipMessage();
+ message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
+ message.setUuid(UUID.randomUUID().toString());
+ message.getMembers().add(convert(me));
+ for (LocalGossipMember other : memberList) {
+ message.getMembers().add(convert(other));
+ }
+ byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
+ int packet_length = json_bytes.length;
+ if (packet_length < GossipManager.MAX_PACKET_SIZE) {
+ Response r = gossipCore.send(message, member.getUri());
+ if (r instanceof ActiveGossipOk){
+ //maybe count metrics here
+ } else {
+ LOGGER.warn("Message "+ message + " generated response "+ r);
+ }
+ } else {
+ GossipService.LOGGER.error("The length of the to be send message is too large ("
+ + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
+ }
+ } catch (IOException e1) {
+ GossipService.LOGGER.warn(e1);
+ }
+ }
/**
* Abstract method which should be implemented by a subclass. This method should return a member
* of the list to gossip with.
@@ -78,5 +122,23 @@ abstract public class ActiveGossipThread implements Runnable {
* The list of members which are stored in the local list of members.
* @return The chosen LocalGossipMember to gossip with.
*/
- abstract protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList);
+ protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
+ LocalGossipMember member = null;
+ if (memberList.size() > 0) {
+ int randomNeighborIndex = random.nextInt(memberList.size());
+ member = memberList.get(randomNeighborIndex);
+ } else {
+ LOGGER.debug("I am alone in this world.");
+ }
+ return member;
+ }
+
+ private GossipMember convert(LocalGossipMember member){
+ GossipMember gm = new GossipMember();
+ gm.setCluster(member.getClusterName());
+ gm.setHeartbeat(member.getHeartbeat());
+ gm.setUri(member.getUri().toASCIIString());
+ gm.setId(member.getId());
+ return gm;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/375cee2a/src/main/java/org/apache/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index 79be431..0b2cfd2 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -40,9 +40,8 @@ import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.event.GossipState;
import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
-import org.apache.gossip.manager.random.RandomActiveGossipThread;
-public abstract class GossipManager extends Thread implements NotificationListener {
+public abstract class GossipManager implements NotificationListener {
public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
@@ -179,7 +178,7 @@ public abstract class GossipManager extends Thread implements NotificationListen
* Starts the client. Specifically, start the various cycles for this protocol. Start the gossip
* thread and start the receiver thread.
*/
- public void run() {
+ public void init() {
for (LocalGossipMember member : members.keySet()) {
if (member != me) {
member.startTimeoutTimer();
@@ -187,17 +186,9 @@ public abstract class GossipManager extends Thread implements NotificationListen
}
passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, gossipCore);
gossipThreadExecutor.execute(passiveGossipThread);
- activeGossipThread = new RandomActiveGossipThread(this, this.gossipCore);
- gossipThreadExecutor.execute(activeGossipThread);
+ activeGossipThread = new ActiveGossipThread(this, this.gossipCore);
+ activeGossipThread.init();
GossipService.LOGGER.debug("The GossipService is started.");
- while (gossipServiceRunning.get()) {
- try {
- // TODO
- TimeUnit.MILLISECONDS.sleep(1);
- } catch (InterruptedException e) {
- GossipService.LOGGER.warn("The GossipClient was interrupted.");
- }
- }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/375cee2a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java
deleted file mode 100644
index 03d550c..0000000
--- a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip.manager.random;
-
-import java.io.IOException;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Random;
-import java.util.UUID;
-
-import org.apache.gossip.GossipService;
-import org.apache.gossip.LocalGossipMember;
-import org.apache.gossip.manager.ActiveGossipThread;
-import org.apache.gossip.manager.GossipCore;
-import org.apache.gossip.manager.GossipManager;
-import org.apache.gossip.model.ActiveGossipOk;
-import org.apache.gossip.model.GossipMember;
-import org.apache.gossip.model.Response;
-import org.apache.gossip.udp.UdpActiveGossipMessage;
-import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.ObjectMapper;
-
-public class RandomActiveGossipThread extends ActiveGossipThread {
-
- public static final Logger LOGGER = Logger.getLogger(RandomActiveGossipThread.class);
- protected ObjectMapper MAPPER = new ObjectMapper();
-
- /** The Random used for choosing a member to gossip with. */
- private final Random random;
- private final GossipCore gossipCore;
-
- public RandomActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
- super(gossipManager);
- random = new Random();
- this.gossipCore = gossipCore;
- }
-
- /**
- * [The selectToSend() function.] Find a random peer from the local membership list. In the case
- * where this client is the only member in the list, this method will return null.
- *
- * @return Member random member if list is greater than 1, null otherwise
- */
- protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
- LocalGossipMember member = null;
- if (memberList.size() > 0) {
- int randomNeighborIndex = random.nextInt(memberList.size());
- member = memberList.get(randomNeighborIndex);
- } else {
- GossipService.LOGGER.debug("I am alone in this world.");
-
- }
- return member;
- }
-
- protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
-
- me.setHeartbeat(System.currentTimeMillis());
- LocalGossipMember member = selectPartner(memberList);
- if (member == null) {
- GossipService.LOGGER.debug("Send sendMembershipList() is called without action");
- return;
- } else {
- GossipService.LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
- }
-
- try (DatagramSocket socket = new DatagramSocket()) {
- socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
- UdpActiveGossipMessage message = new UdpActiveGossipMessage();
- message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
- message.setUuid(UUID.randomUUID().toString());
- message.getMembers().add(convert(me));
- for (LocalGossipMember other : memberList) {
- message.getMembers().add(convert(other));
- }
- byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
- int packet_length = json_bytes.length;
- if (packet_length < GossipManager.MAX_PACKET_SIZE) {
- Response r = gossipCore.send(message, member.getUri());
- if (r instanceof ActiveGossipOk){
- //maybe count metrics here
- } else {
- LOGGER.warn("Message "+ message + " generated response "+ r);
- }
- } else {
- GossipService.LOGGER.error("The length of the to be send message is too large ("
- + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
- }
- } catch (IOException e1) {
- GossipService.LOGGER.warn(e1);
- }
- }
-
- private GossipMember convert(LocalGossipMember member){
- GossipMember gm = new GossipMember();
- gm.setCluster(member.getClusterName());
- gm.setHeartbeat(member.getHeartbeat());
- gm.setUri(member.getUri().toASCIIString());
- gm.setId(member.getId());
- return gm;
- }
-
-}