You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gossip.apache.org by ec...@apache.org on 2017/01/15 21:31:48 UTC

incubator-gossip git commit: GOSSIP-43 cleanup sockets, readme, and use lambdas

Repository: incubator-gossip
Updated Branches:
  refs/heads/master 9487b8c22 -> 7af6b677e


GOSSIP-43 cleanup sockets, readme, and use lambdas


Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/7af6b677
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/7af6b677
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/7af6b677

Branch: refs/heads/master
Commit: 7af6b677e8c7adbee6023c161afcd3bc30c7206c
Parents: 9487b8c
Author: Edward Capriolo <ed...@gmail.com>
Authored: Sun Jan 15 16:01:43 2017 -0500
Committer: Edward Capriolo <ed...@gmail.com>
Committed: Sun Jan 15 16:01:43 2017 -0500

----------------------------------------------------------------------
 README.md                                       | 37 +++++-----
 .../org/apache/gossip/GossipTimeoutTimer.java   | 78 --------------------
 .../gossip/manager/ActiveGossipThread.java      | 14 +---
 .../org/apache/gossip/manager/GossipCore.java   | 37 ++++++----
 .../apache/gossip/manager/GossipManager.java    | 29 +++-----
 5 files changed, 57 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/7af6b677/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 5bdc620..86f179a 100644
--- a/README.md
+++ b/README.md
@@ -14,7 +14,8 @@ To gossip you need one or more seed nodes. Seed is just a list of places to init
   int seedNodes = 3;
   List<GossipMember> startupMembers = new ArrayList<>();
   for (int i = 1; i < seedNodes+1; ++i) {
-    startupMembers.add(new RemoteGossipMember("127.0.0." + i, 2000, i + ""));
+    URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+    startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
   }
 ```
 
@@ -24,10 +25,9 @@ Here we start five gossip processes and check that they discover each other. (No
   List<GossipService> clients = new ArrayList<>();
   int clusterMembers = 5;
   for (int i = 1; i < clusterMembers+1; ++i) {
-    GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "",
-      LogLevel.DEBUG, startupMembers, settings, null);
-    clients.add(gossipService);
-    gossipService.start();
+    URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
+   GossipService gossipService = new GossipService(cluster, uri, i + "",
+             startupMembers, settings, (a,b) -> {});
   }
 ```
 
@@ -47,22 +47,24 @@ For a very simple client setup with a settings file you first need a JSON file s
 
 ```json
 [{
-  "id":"419af818-0114-4c7b-8fdb-952915335ce4",
-  "port":50001,
+  "cluster":"9f1e6ddf-8e1c-4026-8fc6-8585d0132f77",
+  "id":"447c5bec-f112-492d-968b-f64c8e36dfd7",
+  "uri":"udp://127.0.0.1:50001",
   "gossip_interval":1000,
   "cleanup_interval":10000,
   "members":[
-    {"host":"127.0.0.1", "port":50000}
+    {"cluster": "9f1e6ddf-8e1c-4026-8fc6-8585d0132f77","uri":"udp://127.0.0.1:5000"}
   ]
 }]
 ```
 
 where:
 
+* `cluster` - is the name of the cluster 
 * `id` - is a unique id for this node (you can use any string, but above we use a UUID)
-* `port` - the port to use on the default adapter on the node's machine
+* `uri` - is a URI object containing IP/hostname and port to use on the default adapter on the node's machine
 * `gossip_interval` - how often (in milliseconds) to gossip list of members to other node(s)
-* `cleanup_interval` - when to remove 'dead' nodes (in milliseconds)
+* `cleanup_interval` - when to remove 'dead' nodes (in milliseconds) (deprecated may be coming back)
 * `members` - initial seed nodes
 
 Then starting a local node is as simple as:
@@ -96,13 +98,14 @@ These can be accessed from the `GossipManager` on your `GossipService`, e.g:
 Users can also attach an event listener:
 
 ```java
-  GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "", LogLevel.DEBUG,
-          startupMembers, settings,
-          new GossipListener(){
-    @Override
-    public void gossipEvent(GossipMember member, GossipState state) {
-      System.out.println(member+" "+ state);
-    }
+    GossipService gossipService = new GossipService(cluster, uri, i + "", startupMembers,
+    settings, new GossipListener() {
+      @Override
+      public void gossipEvent(GossipMember member, GossipState state) {
+        System.out.println(System.currentTimeMillis() + " Member " + j + " reports "
+                + member + " " + state);
+      }
   });
+  //The lambda syntax is (a,b) -> { }  //NICE!
 ```
 

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/7af6b677/src/main/java/org/apache/gossip/GossipTimeoutTimer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipTimeoutTimer.java b/src/main/java/org/apache/gossip/GossipTimeoutTimer.java
deleted file mode 100644
index 2fa09c0..0000000
--- a/src/main/java/org/apache/gossip/GossipTimeoutTimer.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gossip;
-
-import java.util.Date;
-
-import javax.management.NotificationListener;
-import javax.management.timer.Timer;
-
-/**
- * This object represents a timer for a gossip member. When the timer has elapsed without being
- * reset in the meantime, it will inform the GossipService about this who in turn will put the
- * gossip member on the dead list, because it is apparantly not alive anymore.
- * 
- * @author joshclemm, harmenw
- */
-public class GossipTimeoutTimer extends Timer {
-
-  private final long sleepTime;
-
-  private final LocalGossipMember source;
-
-  /**
-   * Constructor. Creates a reset-able timer that wakes up after millisecondsSleepTime.
-   * 
-   * @param millisecondsSleepTime
-   *          The time for this timer to wait before an event.
-   * @param notificationListener
-   * @param member
-   */
-  public GossipTimeoutTimer(long millisecondsSleepTime, NotificationListener notificationListener,
-          LocalGossipMember member) {
-    super();
-    sleepTime = millisecondsSleepTime;
-    source = member;
-    addNotificationListener(notificationListener, null, null);
-  }
-
-  /**
-   * @see javax.management.timer.Timer#start()
-   */
-  public void start() {
-    this.reset();
-    super.start();
-  }
-
-  /**
-   * Resets timer to start counting down from original time.
-   */
-  public void reset() {
-    removeAllNotifications();
-    setWakeupTime(sleepTime);
-  }
-
-  /**
-   * Adds a new wake-up time for this timer.
-   * 
-   * @param milliseconds
-   */
-  private void setWakeupTime(long milliseconds) {
-    addNotification("type", "message", source, new Date(System.currentTimeMillis() + milliseconds));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/7af6b677/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
index c09cfe9..731b019 100644
--- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
@@ -18,7 +18,6 @@
 package org.apache.gossip.manager;
 
 import java.io.IOException;
-import java.net.DatagramSocket;
 import java.util.List;
 
 import java.util.Map.Entry;
@@ -116,8 +115,7 @@ public class ActiveGossipThread {
       sharedDataHistogram.update(System.currentTimeMillis() - startTime);
       return;
     }
-    try (DatagramSocket socket = new DatagramSocket()) {
-      socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
+    try {
       for (Entry<String, SharedGossipDataMessage> innerEntry : this.gossipCore.getSharedData().entrySet()){
           UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage();
           message.setUuid(UUID.randomUUID().toString());
@@ -127,7 +125,6 @@ public class ActiveGossipThread {
           message.setNodeId(innerEntry.getValue().getNodeId());
           message.setTimestamp(innerEntry.getValue().getTimestamp());
           message.setPayload(innerEntry.getValue().getPayload());
-          message.setTimestamp(innerEntry.getValue().getTimestamp());
           byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
           int packet_length = json_bytes.length;
           if (packet_length < GossipManager.MAX_PACKET_SIZE) {
@@ -152,8 +149,7 @@ public class ActiveGossipThread {
       sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
       return;
     }
-    try (DatagramSocket socket = new DatagramSocket()) {
-      socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
+    try {
       for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
         for (Entry<String, GossipDataMessage> innerEntry : entry.getValue().entrySet()){
           UdpGossipDataMessage message = new UdpGossipDataMessage();
@@ -164,7 +160,6 @@ public class ActiveGossipThread {
           message.setNodeId(innerEntry.getValue().getNodeId());
           message.setTimestamp(innerEntry.getValue().getTimestamp());
           message.setPayload(innerEntry.getValue().getPayload());
-          message.setTimestamp(innerEntry.getValue().getTimestamp());
           byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
           int packet_length = json_bytes.length;
           if (packet_length < GossipManager.MAX_PACKET_SIZE) {
@@ -190,12 +185,12 @@ public class ActiveGossipThread {
     LocalGossipMember member = selectPartner(gossipManager.getDeadMembers());
     sendMembershipList(gossipManager.getMyself(), member);
   }
+  
   /**
    * Performs the sending of the membership list, after we have incremented our own heartbeat.
    */
   protected void sendMembershipList(LocalGossipMember me, LocalGossipMember member) {
     long startTime = System.currentTimeMillis();
-
     me.setHeartbeat(System.nanoTime());
     if (member == null) {
       LOGGER.debug("Send sendMembershipList() is called without action");
@@ -204,8 +199,7 @@ public class ActiveGossipThread {
     } else {
       LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
     }
-    try (DatagramSocket socket = new DatagramSocket()) {
-      socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
+    try {
       UdpActiveGossipMessage message = new UdpActiveGossipMessage();
       message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
       message.setUuid(UUID.randomUUID().toString());

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/7af6b677/src/main/java/org/apache/gossip/manager/GossipCore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java
index eaea8f6..de940c6 100644
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -67,7 +67,6 @@ public class GossipCore {
   private final ConcurrentHashMap<String, SharedGossipDataMessage> sharedData;
   private final BlockingQueue<Runnable> workQueue;
   
-  
   public GossipCore(GossipManager manager){
     this.gossipManager = manager;
     requests = new ConcurrentHashMap<>();
@@ -175,6 +174,11 @@ public class GossipCore {
     }
   }
   
+  /**
+   * Sends a blocking  message. Throws exception when tranmission fails 
+   * @param message
+   * @param uri
+   */
   private void sendInternal(Base message, URI uri){
     byte[] json_bytes;
     try {
@@ -186,6 +190,7 @@ public class GossipCore {
     if (packet_length < GossipManager.MAX_PACKET_SIZE) {
       byte[] buf = UdpUtil.createBuffer(packet_length, json_bytes);
       try (DatagramSocket socket = new DatagramSocket()) {
+        socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2);
         InetAddress dest = InetAddress.getByName(uri.getHost());
         DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, uri.getPort());
         socket.send(datagramPacket);
@@ -245,9 +250,14 @@ public class GossipCore {
         requests.remove(t.getUuid() + "/" + t.getUriFrom());
       }
     }
-    
   }
   
+  /**
+   * Sends a message across the network while blocking. Catches and ignores IOException in transmission. Used
+   * when the protocol for the message is not to wait for a response
+   * @param message the message to send
+   * @param u the uri to send it to
+   */
   public void sendOneWay(Base message, URI u){
     byte[] json_bytes;
     try {
@@ -259,13 +269,13 @@ public class GossipCore {
     if (packet_length < GossipManager.MAX_PACKET_SIZE) {
       byte[] buf = UdpUtil.createBuffer(packet_length, json_bytes);
       try (DatagramSocket socket = new DatagramSocket()) {
+        socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2);
         InetAddress dest = InetAddress.getByName(u.getHost());
         DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, u.getPort());
         socket.send(datagramPacket);
       } catch (IOException ex) { }
     }
   }
-  
 
   /**
    * Merge lists from remote members and update heartbeats
@@ -280,36 +290,31 @@ public class GossipCore {
     if (LOGGER.isDebugEnabled()){
       debugState(senderMember, remoteList);
     }
-    // if the person sending to us is in the dead list consider them up
     for (LocalGossipMember i : gossipManager.getDeadMembers()) {
       if (i.getId().equals(senderMember.getId())) {
         LOGGER.debug(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri());
         i.recordHeartbeat(senderMember.getHeartbeat());
         i.setHeartbeat(senderMember.getHeartbeat());
-        //TODO set node to  UP here
-        
+        //TODO consider forcing an UP here
       }
     }
     for (GossipMember remoteMember : remoteList) {
       if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {
         continue;
       }
-      LocalGossipMember m = new LocalGossipMember(remoteMember.getClusterName(), 
+      LocalGossipMember aNewMember = new LocalGossipMember(remoteMember.getClusterName(), 
       remoteMember.getUri(), 
       remoteMember.getId(), 
       remoteMember.getHeartbeat(), 
       gossipManager.getSettings().getWindowSize(),
       gossipManager.getSettings().getMinimumSamples());
-      m.recordHeartbeat(remoteMember.getHeartbeat());
-      
-      Object result = gossipManager.getMembers().putIfAbsent(m, GossipState.UP);
+      aNewMember.recordHeartbeat(remoteMember.getHeartbeat());
+      Object result = gossipManager.getMembers().putIfAbsent(aNewMember, GossipState.UP);
       if (result != null){
-        for (Entry<LocalGossipMember, GossipState> l : gossipManager.getMembers().entrySet()){
-          if (l.getKey().getId().equals(remoteMember.getId())){
-            //if (l.getKey().getHeartbeat() < remoteMember.getHeartbeat()){
-              l.getKey().recordHeartbeat(remoteMember.getHeartbeat());
-              l.getKey().setHeartbeat(remoteMember.getHeartbeat());
-            //}
+        for (Entry<LocalGossipMember, GossipState> localMember : gossipManager.getMembers().entrySet()){
+          if (localMember.getKey().getId().equals(remoteMember.getId())){
+            localMember.getKey().recordHeartbeat(remoteMember.getHeartbeat());
+            localMember.getKey().setHeartbeat(remoteMember.getHeartbeat());
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/7af6b677/src/main/java/org/apache/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index a5d57f5..bb8c7fa 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -31,6 +31,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 import org.apache.log4j.Logger;
 
@@ -86,7 +87,7 @@ public abstract class GossipManager {
     clock = new SystemClock();
     dataReaper = new DataReaper(gossipCore, clock);
     me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(),
-            +            settings.getWindowSize(), settings.getMinimumSamples());
+            settings.getWindowSize(), settings.getMinimumSamples());
     members = new ConcurrentSkipListMap<>();
     for (GossipMember startupMember : gossipMembers) {
       if (!startupMember.equals(me)) {
@@ -112,19 +113,15 @@ public abstract class GossipManager {
     return settings;
   }
 
-  // TODO: Use some java 8 goodness for these functions.
-  
   /**
    * @return a read only list of members found in the DOWN state.
    */
   public List<LocalGossipMember> getDeadMembers() {
-    List<LocalGossipMember> down = new ArrayList<>();
-    for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) {
-      if (GossipState.DOWN.equals(entry.getValue())) {
-        down.add(entry.getKey());
-      }
-    }
-    return Collections.unmodifiableList(down);
+    return Collections.unmodifiableList(
+            members.entrySet()
+            .stream()
+            .filter(entry -> GossipState.DOWN.equals(entry.getValue()))
+            .map(Entry::getKey).collect(Collectors.toList()));
   }
 
   /**
@@ -132,13 +129,11 @@ public abstract class GossipManager {
    * @return a read only list of members found in the UP state
    */
   public List<LocalGossipMember> getLiveMembers() {
-    List<LocalGossipMember> up = new ArrayList<>();
-    for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) {
-      if (GossipState.UP.equals(entry.getValue())) {
-        up.add(entry.getKey());
-      }
-    }
-    return Collections.unmodifiableList(up);
+    return Collections.unmodifiableList(
+            members.entrySet()
+            .stream()
+            .filter(entry -> GossipState.UP.equals(entry.getValue()))
+            .map(Entry::getKey).collect(Collectors.toList()));
   }
 
   public LocalGossipMember getMyself() {