You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/06/18 21:19:08 UTC

[05/13] git commit: Clean up gossiper fully after shadow round.

Clean up gossiper fully after shadow round.

Patch by brandonwilliams, reviewed by thobbs for CASSANDRA-7318


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7bceedf4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7bceedf4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7bceedf4

Branch: refs/heads/trunk
Commit: 7bceedf47017f625779c00df3d483a88010681f5
Parents: 498ee24
Author: Brandon Williams <br...@apache.org>
Authored: Wed Jun 18 14:06:59 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Jun 18 14:08:58 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |    1 +
 src/java/org/apache/cassandra/gms/Gossiper.java |    2 +
 .../org/apache/cassandra/gms/Gossiper.java.orig | 1275 ++++++++++++++++++
 3 files changed, 1278 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7bceedf4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8d81e3f..329da9b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -19,6 +19,7 @@ Merged from 2.0:
  * Make StreamSession#closeSession() idempotent (CASSANDRA-7262)
  * Fix infinite loop on exception while streaming (CASSANDRA-7330)
 Merged from 1.2:
+ * Fix problem truncating on a node that was previously in a dead state (CASSANDRA-7318)
  * Don't insert tombstones that hide indexed values into 2i (CASSANDRA-7268)
  * Track metrics at a keyspace level (CASSANDRA-6539)
  * Add replace_address_first_boot flag to only replace if not bootstrapped

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7bceedf4/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 59b411a..f0189ed 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -704,6 +704,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     public void resetEndpointStateMap()
     {
         endpointStateMap.clear();
+        unreachableEndpoints.clear();
+        liveEndpoints.clear();
     }
 
     public Set<Entry<InetAddress, EndpointState>> getEndpointStates()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7bceedf4/src/java/org/apache/cassandra/gms/Gossiper.java.orig
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java.orig b/src/java/org/apache/cassandra/gms/Gossiper.java.orig
new file mode 100644
index 0000000..a2addd1
--- /dev/null
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java.orig
@@ -0,0 +1,1275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.gms;
+
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.*;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+/**
+ * This module is responsible for Gossiping information for the local endpoint. This abstraction
+ * maintains the list of live and dead endpoints. Periodically i.e. every 1 second this module
+ * chooses a random node and initiates a round of Gossip with it. A round of Gossip involves 3
+ * rounds of messaging. For instance if node A wants to initiate a round of Gossip with node B
+ * it starts off by sending node B a GossipDigestSynMessage. Node B on receipt of this message
+ * sends node A a GossipDigestAckMessage. On receipt of this message node A sends node B a
+ * GossipDigestAck2Message which completes a round of Gossip. This module as and when it hears one
+ * of the three above mentioned messages updates the Failure Detector with the liveness information.
+ * Upon hearing a GossipShutdownMessage, this module will instantly mark the remote node as down in
+ * the Failure Detector.
+ */
+
+public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
+{
+    private static final String MBEAN_NAME = "org.apache.cassandra.net:type=Gossiper";
+
+    private static final DebuggableScheduledThreadPoolExecutor executor = new DebuggableScheduledThreadPoolExecutor("GossipTasks");
+
+    static final ApplicationState[] STATES = ApplicationState.values();
+    static final List<String> DEAD_STATES = Arrays.asList(VersionedValue.REMOVING_TOKEN, VersionedValue.REMOVED_TOKEN,
+                                                          VersionedValue.STATUS_LEFT, VersionedValue.HIBERNATE);
+
+    private ScheduledFuture<?> scheduledGossipTask;
+    public final static int intervalInMillis = 1000;
+    public final static int QUARANTINE_DELAY = StorageService.RING_DELAY * 2;
+    private static final Logger logger = LoggerFactory.getLogger(Gossiper.class);
+    public static final Gossiper instance = new Gossiper();
+
+    public static final long aVeryLongTime = 259200 * 1000; // 3 days
+    private long FatClientTimeout;
+    private final Random random = new Random();
+    private final Comparator<InetAddress> inetcomparator = new Comparator<InetAddress>()
+    {
+        public int compare(InetAddress addr1,  InetAddress addr2)
+        {
+            return addr1.getHostAddress().compareTo(addr2.getHostAddress());
+        }
+    };
+
+    /* subscribers for interest in EndpointState change */
+    private final List<IEndpointStateChangeSubscriber> subscribers = new CopyOnWriteArrayList<IEndpointStateChangeSubscriber>();
+
+    /* live member set */
+    private final Set<InetAddress> liveEndpoints = new ConcurrentSkipListSet<InetAddress>(inetcomparator);
+
+    /* unreachable member set */
+    private final Map<InetAddress, Long> unreachableEndpoints = new ConcurrentHashMap<InetAddress, Long>();
+
+    /* initial seeds for joining the cluster */
+    private final Set<InetAddress> seeds = new ConcurrentSkipListSet<InetAddress>(inetcomparator);
+
+    /* map where key is the endpoint and value is the state associated with the endpoint */
+    final ConcurrentMap<InetAddress, EndpointState> endpointStateMap = new ConcurrentHashMap<InetAddress, EndpointState>();
+
+    /* map where key is endpoint and value is timestamp when this endpoint was removed from
+     * gossip. We will ignore any gossip regarding these endpoints for QUARANTINE_DELAY time
+     * after removal to prevent nodes from falsely reincarnating during the time when removal
+     * gossip gets propagated to all nodes */
+    private final Map<InetAddress, Long> justRemovedEndpoints = new ConcurrentHashMap<InetAddress, Long>();
+
+    private final Map<InetAddress, Long> expireTimeEndpointMap = new ConcurrentHashMap<InetAddress, Long>();
+
+    private boolean inShadowRound = false;
+
+    private class GossipTask implements Runnable
+    {
+        public void run()
+        {
+            try
+            {
+                //wait on messaging service to start listening
+                MessagingService.instance().waitUntilListening();
+
+                /* Update the local heartbeat counter. */
+                endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().updateHeartBeat();
+                if (logger.isTraceEnabled())
+                    logger.trace("My heartbeat is now " + endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().getHeartBeatVersion());
+                final List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
+                Gossiper.instance.makeRandomGossipDigest(gDigests);
+
+                if ( gDigests.size() > 0 )
+                {
+                    GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
+                                                                           DatabaseDescriptor.getPartitionerName(),
+                                                                           gDigests);
+                    MessageOut<GossipDigestSyn> message = new MessageOut<GossipDigestSyn>(MessagingService.Verb.GOSSIP_DIGEST_SYN,
+                                                                                                        digestSynMessage,
+                                                                                                        GossipDigestSyn.serializer);
+                    /* Gossip to some random live member */
+                    boolean gossipedToSeed = doGossipToLiveMember(message);
+
+                    /* Gossip to some unreachable member with some probability to check if he is back up */
+                    doGossipToUnreachableMember(message);
+
+                    /* Gossip to a seed if we did not do so above, or we have seen less nodes
+                       than there are seeds.  This prevents partitions where each group of nodes
+                       is only gossiping to a subset of the seeds.
+
+                       The most straightforward check would be to check that all the seeds have been
+                       verified either as live or unreachable.  To avoid that computation each round,
+                       we reason that:
+
+                       either all the live nodes are seeds, in which case non-seeds that come online
+                       will introduce themselves to a member of the ring by definition,
+
+                       or there is at least one non-seed node in the list, in which case eventually
+                       someone will gossip to it, and then do a gossip to a random seed from the
+                       gossipedToSeed check.
+
+                       See CASSANDRA-150 for more exposition. */
+                    if (!gossipedToSeed || liveEndpoints.size() < seeds.size())
+                        doGossipToSeed(message);
+
+                    if (logger.isTraceEnabled())
+                        logger.trace("Performing status check ...");
+                    doStatusCheck();
+                }
+            }
+            catch (Exception e)
+            {
+                logger.error("Gossip error", e);
+            }
+        }
+    }
+
+    private Gossiper()
+    {
+        // half of QUARATINE_DELAY, to ensure justRemovedEndpoints has enough leeway to prevent re-gossip
+        FatClientTimeout = (long)(QUARANTINE_DELAY / 2);
+        /* register with the Failure Detector for receiving Failure detector events */
+        FailureDetector.instance.registerFailureDetectionEventListener(this);
+
+        // Register this instance with JMX
+        try
+        {
+            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public boolean seenAnySeed()
+    {
+        for (Map.Entry<InetAddress, EndpointState> entry : endpointStateMap.entrySet())
+        {
+            if (seeds.contains(entry.getKey()))
+                return true;
+            try
+            {
+                if (entry.getValue().getApplicationStateMap().containsKey(ApplicationState.INTERNAL_IP) && seeds.contains(InetAddress.getByName(entry.getValue().getApplicationState(ApplicationState.INTERNAL_IP).value)))
+                    return true;
+            }
+            catch (UnknownHostException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Register for interesting state changes.
+     * @param subscriber module which implements the IEndpointStateChangeSubscriber
+     */
+    public void register(IEndpointStateChangeSubscriber subscriber)
+    {
+        subscribers.add(subscriber);
+    }
+
+    /**
+     * Unregister interest for state changes.
+     * @param subscriber module which implements the IEndpointStateChangeSubscriber
+     */
+    public void unregister(IEndpointStateChangeSubscriber subscriber)
+    {
+        subscribers.remove(subscriber);
+    }
+
+    public Set<InetAddress> getLiveMembers()
+    {
+        Set<InetAddress> liveMembers = new HashSet<InetAddress>(liveEndpoints);
+        if (!liveMembers.contains(FBUtilities.getBroadcastAddress()))
+            liveMembers.add(FBUtilities.getBroadcastAddress());
+        return liveMembers;
+    }
+
+    public Set<InetAddress> getLiveTokenOwners()
+    {
+        Set<InetAddress> tokenOwners = new HashSet<InetAddress>();
+        for (InetAddress member : getLiveMembers())
+        {
+            EndpointState epState = endpointStateMap.get(member);
+            if (epState != null && !isDeadState(epState) && StorageService.instance.getTokenMetadata().isMember(member))
+                tokenOwners.add(member);
+        }
+        return tokenOwners;
+    }
+
+    public Set<InetAddress> getUnreachableMembers()
+    {
+        return unreachableEndpoints.keySet();
+    }
+
+    public long getEndpointDowntime(InetAddress ep)
+    {
+        Long downtime = unreachableEndpoints.get(ep);
+        if (downtime != null)
+            return System.currentTimeMillis() - downtime;
+        else
+            return 0L;
+    }
+
+    /**
+     * This method is part of IFailureDetectionEventListener interface. This is invoked
+     * by the Failure Detector when it convicts an end point.
+     *
+     * @param endpoint end point that is convicted.
+    */
+    public void convict(InetAddress endpoint, double phi)
+    {
+        EndpointState epState = endpointStateMap.get(endpoint);
+        if (epState.isAlive() && !isDeadState(epState))
+        {
+            markDead(endpoint, epState);
+        }
+        else
+            epState.markDead();
+    }
+
+    /**
+     * Return either: the greatest heartbeat or application state
+     * @param epState
+     * @return
+     */
+    int getMaxEndpointStateVersion(EndpointState epState)
+    {
+        int maxVersion = epState.getHeartBeatState().getHeartBeatVersion();
+        for (VersionedValue value : epState.getApplicationStateMap().values())
+            maxVersion = Math.max(maxVersion,  value.version);
+        return maxVersion;
+    }
+
+    /**
+     * Removes the endpoint from gossip completely
+     *
+     * @param endpoint endpoint to be removed from the current membership.
+    */
+    private void evictFromMembership(InetAddress endpoint)
+    {
+        unreachableEndpoints.remove(endpoint);
+        endpointStateMap.remove(endpoint);
+        expireTimeEndpointMap.remove(endpoint);
+        quarantineEndpoint(endpoint);
+        if (logger.isDebugEnabled())
+            logger.debug("evicting " + endpoint + " from gossip");
+    }
+
+    /**
+     * Removes the endpoint from Gossip but retains endpoint state
+     */
+    public void removeEndpoint(InetAddress endpoint)
+    {
+        // do subscribers first so anything in the subscriber that depends on gossiper state won't get confused
+        for (IEndpointStateChangeSubscriber subscriber : subscribers)
+            subscriber.onRemove(endpoint);
+
+        if(seeds.contains(endpoint))
+        {
+            buildSeedsList();
+            seeds.remove(endpoint);
+            logger.info("removed {} from seeds, updated seeds list = {}", endpoint, seeds);
+        }
+
+        liveEndpoints.remove(endpoint);
+        unreachableEndpoints.remove(endpoint);
+        // do not remove endpointState until the quarantine expires
+        FailureDetector.instance.remove(endpoint);
+        MessagingService.instance().resetVersion(endpoint);
+        quarantineEndpoint(endpoint);
+        MessagingService.instance().destroyConnectionPool(endpoint);
+        if (logger.isDebugEnabled())
+            logger.debug("removing endpoint " + endpoint);
+    }
+
+    /**
+     * Quarantines the endpoint for QUARANTINE_DELAY
+     * @param endpoint
+     */
+    private void quarantineEndpoint(InetAddress endpoint)
+    {
+        justRemovedEndpoints.put(endpoint, System.currentTimeMillis());
+    }
+
+    /**
+     * Remove the Endpoint and evict immediately, to avoid gossiping about this node.
+     * This should only be called when a token is taken over by a new IP address.
+     * @param endpoint The endpoint that has been replaced
+     */
+    public void replacedEndpoint(InetAddress endpoint)
+    {
+        removeEndpoint(endpoint);
+        evictFromMembership(endpoint);
+    }
+
+    /**
+     * The gossip digest is built based on randomization
+     * rather than just looping through the collection of live endpoints.
+     *
+     * @param gDigests list of Gossip Digests.
+    */
+    private void makeRandomGossipDigest(List<GossipDigest> gDigests)
+    {
+        EndpointState epState;
+        int generation = 0;
+        int maxVersion = 0;
+
+        // local epstate will be part of endpointStateMap
+        List<InetAddress> endpoints = new ArrayList<InetAddress>(endpointStateMap.keySet());
+        Collections.shuffle(endpoints, random);
+        for (InetAddress endpoint : endpoints)
+        {
+            epState = endpointStateMap.get(endpoint);
+            if (epState != null)
+            {
+                generation = epState.getHeartBeatState().getGeneration();
+                maxVersion = getMaxEndpointStateVersion(epState);
+            }
+            gDigests.add(new GossipDigest(endpoint, generation, maxVersion));
+        }
+
+        if (logger.isTraceEnabled())
+        {
+            StringBuilder sb = new StringBuilder();
+            for ( GossipDigest gDigest : gDigests )
+            {
+                sb.append(gDigest);
+                sb.append(" ");
+            }
+                logger.trace("Gossip Digests are : " + sb.toString());
+        }
+    }
+
+    /**
+     * This method will begin removing an existing endpoint from the cluster by spoofing its state
+     * This should never be called unless this coordinator has had 'removenode' invoked
+     *
+     * @param endpoint - the endpoint being removed
+     * @param hostId - the ID of the host being removed
+     * @param localHostId - my own host ID for replication coordination
+     */
+    public void advertiseRemoving(InetAddress endpoint, UUID hostId, UUID localHostId)
+    {
+        EndpointState epState = endpointStateMap.get(endpoint);
+        // remember this node's generation
+        int generation = epState.getHeartBeatState().getGeneration();
+        logger.info("Removing host: {}", hostId);
+        logger.info("Sleeping for " + StorageService.RING_DELAY + "ms to ensure " + endpoint + " does not change");
+        try
+        {
+            Thread.sleep(StorageService.RING_DELAY);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+        // make sure it did not change
+        epState = endpointStateMap.get(endpoint);
+        if (epState.getHeartBeatState().getGeneration() != generation)
+            throw new RuntimeException("Endpoint " + endpoint + " generation changed while trying to remove it");
+        // update the other node's generation to mimic it as if it had changed it itself
+        logger.info("Advertising removal for " + endpoint);
+        epState.updateTimestamp(); // make sure we don't evict it too soon
+        epState.getHeartBeatState().forceNewerGenerationUnsafe();
+        epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removingNonlocal(hostId));
+        epState.addApplicationState(ApplicationState.REMOVAL_COORDINATOR, StorageService.instance.valueFactory.removalCoordinator(localHostId));
+        endpointStateMap.put(endpoint, epState);
+    }
+
+    /**
+     * Handles switching the endpoint's state from REMOVING_TOKEN to REMOVED_TOKEN
+     * This should only be called after advertiseRemoving
+     * @param endpoint
+     * @param hostId
+     */
+    public void advertiseTokenRemoved(InetAddress endpoint, UUID hostId)
+    {
+        EndpointState epState = endpointStateMap.get(endpoint);
+        epState.updateTimestamp(); // make sure we don't evict it too soon
+        epState.getHeartBeatState().forceNewerGenerationUnsafe();
+        long expireTime = computeExpireTime();
+        epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removedNonlocal(hostId, expireTime));
+        logger.info("Completing removal of " + endpoint);
+        addExpireTimeForEndpoint(endpoint, expireTime);
+        endpointStateMap.put(endpoint, epState);
+        // ensure at least one gossip round occurs before returning
+        try
+        {
+            Thread.sleep(intervalInMillis * 2);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    /**
+     * Do not call this method unless you know what you are doing.
+     * It will try extremely hard to obliterate any endpoint from the ring,
+     * even if it does not know about it.
+     * This should only ever be called by human via JMX.
+     * @param  address
+     * @throws UnknownHostException
+     */
+    public void unsafeAssassinateEndpoint(String address) throws UnknownHostException
+    {
+        InetAddress endpoint = InetAddress.getByName(address);
+        EndpointState epState = endpointStateMap.get(endpoint);
+        Collection<Token> tokens = null;
+        logger.warn("Assassinating {} via gossip", endpoint);
+        if (epState == null)
+        {
+            epState = new EndpointState(new HeartBeatState((int)((System.currentTimeMillis() + 60000) / 1000), 9999));
+        }
+        else
+        {
+            try
+            {
+                tokens = StorageService.instance.getTokenMetadata().getTokens(endpoint);
+            }
+            catch (AssertionError e)
+            {
+            }
+            int generation = epState.getHeartBeatState().getGeneration();
+            logger.info("Sleeping for " + StorageService.RING_DELAY + "ms to ensure " + endpoint + " does not change");
+            try
+            {
+                Thread.sleep(StorageService.RING_DELAY);
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
+            // make sure it did not change
+            EndpointState newState = endpointStateMap.get(endpoint);
+            if (newState == null)
+                logger.warn("Endpoint {} disappeared while trying to assassinate, continuing anyway", endpoint);
+            else if (newState.getHeartBeatState().getGeneration() != generation)
+                throw new RuntimeException("Endpoint " + endpoint + " generation changed while trying to remove it");
+            epState.updateTimestamp(); // make sure we don't evict it too soon
+            epState.getHeartBeatState().forceNewerGenerationUnsafe();
+        }
+        if (tokens == null)
+            tokens = Arrays.asList(StorageService.instance.getBootstrapToken());
+        // do not pass go, do not collect 200 dollars, just gtfo
+        epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.left(tokens, computeExpireTime()));
+        handleMajorStateChange(endpoint, epState);
+        try
+        {
+            Thread.sleep(intervalInMillis * 4);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+        logger.warn("Finished assassinating {}", endpoint);
+    }
+
+    public boolean isKnownEndpoint(InetAddress endpoint)
+    {
+        return endpointStateMap.containsKey(endpoint);
+    }
+
+    public int getCurrentGenerationNumber(InetAddress endpoint)
+    {
+        return endpointStateMap.get(endpoint).getHeartBeatState().getGeneration();
+    }
+
+    /**
+     * Returns true if the chosen target was also a seed. False otherwise
+     *
+     *
+     * @param message
+     * @param epSet a set of endpoint from which a random endpoint is chosen.
+     *  @return true if the chosen endpoint is also a seed.
+     */
+    private boolean sendGossip(MessageOut<GossipDigestSyn> message, Set<InetAddress> epSet)
+    {
+        List<InetAddress> liveEndpoints = ImmutableList.copyOf(epSet);
+        
+        int size = liveEndpoints.size();
+        if (size < 1)
+            return false;
+        /* Generate a random number from 0 -> size */
+        int index = (size == 1) ? 0 : random.nextInt(size);
+        InetAddress to = liveEndpoints.get(index);
+        if (logger.isTraceEnabled())
+            logger.trace("Sending a GossipDigestSyn to {} ...", to);
+        MessagingService.instance().sendOneWay(message, to);
+        return seeds.contains(to);
+    }
+
+    /* Sends a Gossip message to a live member and returns true if the recipient was a seed */
+    private boolean doGossipToLiveMember(MessageOut<GossipDigestSyn> message)
+    {
+        int size = liveEndpoints.size();
+        if ( size == 0 )
+            return false;
+        return sendGossip(message, liveEndpoints);
+    }
+
+    /* Sends a Gossip message to an unreachable member */
+    private void doGossipToUnreachableMember(MessageOut<GossipDigestSyn> message)
+    {
+        double liveEndpointCount = liveEndpoints.size();
+        double unreachableEndpointCount = unreachableEndpoints.size();
+        if ( unreachableEndpointCount > 0 )
+        {
+            /* based on some probability */
+            double prob = unreachableEndpointCount / (liveEndpointCount + 1);
+            double randDbl = random.nextDouble();
+            if ( randDbl < prob )
+                sendGossip(message, unreachableEndpoints.keySet());
+        }
+    }
+
+    /* Gossip to a seed for facilitating partition healing */
+    private void doGossipToSeed(MessageOut<GossipDigestSyn> prod)
+    {
+        int size = seeds.size();
+        if ( size > 0 )
+        {
+            if ( size == 1 && seeds.contains(FBUtilities.getBroadcastAddress()) )
+            {
+                return;
+            }
+
+            if ( liveEndpoints.size() == 0 )
+            {
+                sendGossip(prod, seeds);
+            }
+            else
+            {
+                /* Gossip with the seed with some probability. */
+                double probability = seeds.size() / (double)( liveEndpoints.size() + unreachableEndpoints.size() );
+                double randDbl = random.nextDouble();
+                if ( randDbl <= probability )
+                    sendGossip(prod, seeds);
+            }
+        }
+    }
+
+    public boolean isFatClient(InetAddress endpoint)
+    {
+        EndpointState epState = endpointStateMap.get(endpoint);
+        if (epState == null)
+        {
+            return false;
+        }
+        return !isDeadState(epState) && !StorageService.instance.getTokenMetadata().isMember(endpoint);
+    }
+
+    private void doStatusCheck()
+    {
+        long now = System.currentTimeMillis();
+
+        Set<InetAddress> eps = endpointStateMap.keySet();
+        for ( InetAddress endpoint : eps )
+        {
+            if ( endpoint.equals(FBUtilities.getBroadcastAddress()) )
+                continue;
+
+            FailureDetector.instance.interpret(endpoint);
+            EndpointState epState = endpointStateMap.get(endpoint);
+            if ( epState != null )
+            {
+                long duration = now - epState.getUpdateTimestamp();
+
+                // check if this is a fat client. fat clients are removed automatically from
+                // gossip after FatClientTimeout.  Do not remove dead states here.
+                if (isFatClient(endpoint) && !justRemovedEndpoints.containsKey(endpoint) && (duration > FatClientTimeout))
+                {
+                    logger.info("FatClient " + endpoint + " has been silent for " + FatClientTimeout + "ms, removing from gossip");
+                    removeEndpoint(endpoint); // will put it in justRemovedEndpoints to respect quarantine delay
+                    evictFromMembership(endpoint); // can get rid of the state immediately
+                }
+
+                // check for dead state removal
+                long expireTime = getExpireTimeForEndpoint(endpoint);
+                if (!epState.isAlive() && (now > expireTime)
+                        && (!StorageService.instance.getTokenMetadata().isMember(endpoint)))
+                {
+                    if (logger.isDebugEnabled())
+                    {
+                        logger.debug("time is expiring for endpoint : " + endpoint + " (" + expireTime + ")");
+                    }
+                    evictFromMembership(endpoint);
+                }
+            }
+        }
+
+        if (!justRemovedEndpoints.isEmpty())
+        {
+            for (Entry<InetAddress, Long> entry : justRemovedEndpoints.entrySet())
+            {
+                if ((now - entry.getValue()) > QUARANTINE_DELAY)
+                {
+                    if (logger.isDebugEnabled())
+                        logger.debug(QUARANTINE_DELAY + " elapsed, " + entry.getKey() + " gossip quarantine over");
+                    justRemovedEndpoints.remove(entry.getKey());
+                }
+            }
+        }
+    }
+
+    protected long getExpireTimeForEndpoint(InetAddress endpoint)
+    {
+        /* default expireTime is aVeryLongTime */
+        Long storedTime = expireTimeEndpointMap.get(endpoint);
+        return storedTime == null ? computeExpireTime() : storedTime;
+    }
+
+    public EndpointState getEndpointStateForEndpoint(InetAddress ep)
+    {
+        return endpointStateMap.get(ep);
+    }
+
+    // removes ALL endpoint states; should only be called after shadow gossip
+    public void resetEndpointStateMap()
+    {
+        endpointStateMap.clear();
+    }
+
+    public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
+    {
+        return endpointStateMap.entrySet();
+    }
+
+    public boolean usesHostId(InetAddress endpoint)
+    {
+        if (MessagingService.instance().knowsVersion(endpoint) && MessagingService.instance().getVersion(endpoint) >= MessagingService.VERSION_12)
+            return true;
+        else  if (getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.NET_VERSION) != null && Integer.parseInt(getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.NET_VERSION).value) >= MessagingService.VERSION_12)
+            return true;
+        return false;
+    }
+
+    public boolean usesVnodes(InetAddress endpoint)
+    {
+        return usesHostId(endpoint) && getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.TOKENS) != null;
+    }
+
+    public UUID getHostId(InetAddress endpoint)
+    {
+        if (!usesHostId(endpoint))
+            throw new RuntimeException("Host " + endpoint + " does not use new-style tokens!");
+        return UUID.fromString(getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
+    }
+
+    EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int version)
+    {
+        EndpointState epState = endpointStateMap.get(forEndpoint);
+        EndpointState reqdEndpointState = null;
+
+        if ( epState != null )
+        {
+            /*
+             * Here we try to include the Heart Beat state only if it is
+             * greater than the version passed in. It might happen that
+             * the heart beat version maybe lesser than the version passed
+             * in and some application state has a version that is greater
+             * than the version passed in. In this case we also send the old
+             * heart beat and throw it away on the receiver if it is redundant.
+            */
+            int localHbVersion = epState.getHeartBeatState().getHeartBeatVersion();
+            if ( localHbVersion > version )
+            {
+                reqdEndpointState = new EndpointState(epState.getHeartBeatState());
+                if (logger.isTraceEnabled())
+                    logger.trace("local heartbeat version " + localHbVersion + " greater than " + version + " for " + forEndpoint);
+            }
+            /* Accumulate all application states whose versions are greater than "version" variable */
+            for (Entry<ApplicationState, VersionedValue> entry : epState.getApplicationStateMap().entrySet())
+            {
+                VersionedValue value = entry.getValue();
+                if ( value.version > version )
+                {
+                    if ( reqdEndpointState == null )
+                    {
+                        reqdEndpointState = new EndpointState(epState.getHeartBeatState());
+                    }
+                    final ApplicationState key = entry.getKey();
+                    if (logger.isTraceEnabled())
+                        logger.trace("Adding state " + key + ": " + value.value);
+                    reqdEndpointState.addApplicationState(key, value);
+                }
+            }
+        }
+        return reqdEndpointState;
+    }
+
+    /** determine which endpoint started up earlier */
+    public int compareEndpointStartup(InetAddress addr1, InetAddress addr2)
+    {
+        EndpointState ep1 = getEndpointStateForEndpoint(addr1);
+        EndpointState ep2 = getEndpointStateForEndpoint(addr2);
+        assert ep1 != null && ep2 != null;
+        return ep1.getHeartBeatState().getGeneration() - ep2.getHeartBeatState().getGeneration();
+    }
+
+    void notifyFailureDetector(Map<InetAddress, EndpointState> remoteEpStateMap)
+    {
+        for (Entry<InetAddress, EndpointState> entry : remoteEpStateMap.entrySet())
+        {
+            notifyFailureDetector(entry.getKey(), entry.getValue());
+        }
+    }
+
+    void notifyFailureDetector(InetAddress endpoint, EndpointState remoteEndpointState)
+    {
+        EndpointState localEndpointState = endpointStateMap.get(endpoint);
+        /*
+         * If the local endpoint state exists then report to the FD only
+         * if the versions workout.
+        */
+        if ( localEndpointState != null )
+        {
+            IFailureDetector fd = FailureDetector.instance;
+            int localGeneration = localEndpointState.getHeartBeatState().getGeneration();
+            int remoteGeneration = remoteEndpointState.getHeartBeatState().getGeneration();
+            if ( remoteGeneration > localGeneration )
+            {
+                localEndpointState.updateTimestamp();
+                // this node was dead and the generation changed, this indicates a reboot, or possibly a takeover
+                // we will clean the fd intervals for it and relearn them
+                if (!localEndpointState.isAlive())
+                {
+                    logger.debug("Clearing interval times for {} due to generation change", endpoint);
+                    fd.remove(endpoint);
+                }
+                fd.report(endpoint);
+                return;
+            }
+
+            if ( remoteGeneration == localGeneration )
+            {
+                int localVersion = getMaxEndpointStateVersion(localEndpointState);
+                int remoteVersion = remoteEndpointState.getHeartBeatState().getHeartBeatVersion();
+                if ( remoteVersion > localVersion )
+                {
+                    localEndpointState.updateTimestamp();
+                    // just a version change, report to the fd
+                    fd.report(endpoint);
+                }
+            }
+        }
+
+    }
+
+    private void markAlive(InetAddress addr, EndpointState localState)
+    {
+        if (logger.isTraceEnabled())
+            logger.trace("marking as alive {}", addr);
+        localState.markAlive();
+        localState.updateTimestamp(); // prevents doStatusCheck from racing us and evicting if it was down > aVeryLongTime
+        liveEndpoints.add(addr);
+        unreachableEndpoints.remove(addr);
+        expireTimeEndpointMap.remove(addr);
+        logger.debug("removing expire time for endpoint : " + addr);
+        logger.info("InetAddress {} is now UP", addr);
+        for (IEndpointStateChangeSubscriber subscriber : subscribers)
+            subscriber.onAlive(addr, localState);
+        if (logger.isTraceEnabled())
+            logger.trace("Notified " + subscribers);
+    }
+
+    private void markDead(InetAddress addr, EndpointState localState)
+    {
+        if (logger.isTraceEnabled())
+            logger.trace("marking as down {}", addr);
+        localState.markDead();
+        liveEndpoints.remove(addr);
+        unreachableEndpoints.put(addr, System.currentTimeMillis());
+        logger.info("InetAddress {} is now DOWN", addr);
+        for (IEndpointStateChangeSubscriber subscriber : subscribers)
+            subscriber.onDead(addr, localState);
+        if (logger.isTraceEnabled())
+            logger.trace("Notified " + subscribers);
+    }
+
+    /**
+     * This method is called whenever there is a "big" change in ep state (a generation change for a known node).
+     *
+     * @param ep endpoint
+     * @param epState EndpointState for the endpoint
+     */
+    private void handleMajorStateChange(InetAddress ep, EndpointState epState)
+    {
+        if (!isDeadState(epState))
+        {
+            if (endpointStateMap.get(ep) != null)
+                logger.info("Node {} has restarted, now UP", ep);
+            else
+                logger.info("Node {} is now part of the cluster", ep);
+        }
+        if (logger.isTraceEnabled())
+            logger.trace("Adding endpoint state for " + ep);
+        endpointStateMap.put(ep, epState);
+
+        // the node restarted: it is up to the subscriber to take whatever action is necessary
+        for (IEndpointStateChangeSubscriber subscriber : subscribers)
+            subscriber.onRestart(ep, epState);
+
+        if (!isDeadState(epState))
+            markAlive(ep, epState);
+        else
+        {
+            logger.debug("Not marking " + ep + " alive due to dead state");
+            markDead(ep, epState);
+        }
+        for (IEndpointStateChangeSubscriber subscriber : subscribers)
+            subscriber.onJoin(ep, epState);
+    }
+
+    public Boolean isDeadState(EndpointState epState)
+    {
+        if (epState.getApplicationState(ApplicationState.STATUS) == null)
+            return false;
+        String value = epState.getApplicationState(ApplicationState.STATUS).value;
+        String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1);
+        assert (pieces.length > 0);
+        String state = pieces[0];
+        for (String deadstate : DEAD_STATES)
+        {
+            if (state.equals(deadstate))
+                return true;
+        }
+        return false;
+    }
+
+    void applyStateLocally(Map<InetAddress, EndpointState> epStateMap)
+    {
+        for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet())
+        {
+            InetAddress ep = entry.getKey();
+            if ( ep.equals(FBUtilities.getBroadcastAddress()) && !isInShadowRound())
+                continue;
+            if (justRemovedEndpoints.containsKey(ep))
+            {
+                if (logger.isTraceEnabled())
+                    logger.trace("Ignoring gossip for " + ep + " because it is quarantined");
+                continue;
+            }
+
+            EndpointState localEpStatePtr = endpointStateMap.get(ep);
+            EndpointState remoteState = entry.getValue();
+            /*
+                If state does not exist just add it. If it does then add it if the remote generation is greater.
+                If there is a generation tie, attempt to break it by heartbeat version.
+            */
+            if ( localEpStatePtr != null )
+            {
+                int localGeneration = localEpStatePtr.getHeartBeatState().getGeneration();
+                int remoteGeneration = remoteState.getHeartBeatState().getGeneration();
+                if (logger.isTraceEnabled())
+                    logger.trace(ep + "local generation " + localGeneration + ", remote generation " + remoteGeneration);
+
+                if (remoteGeneration > localGeneration)
+                {
+                    if (logger.isTraceEnabled())
+                        logger.trace("Updating heartbeat state generation to " + remoteGeneration + " from " + localGeneration + " for " + ep);
+                    // major state change will handle the update by inserting the remote state directly
+                    handleMajorStateChange(ep, remoteState);
+                }
+                else if ( remoteGeneration == localGeneration ) // generation has not changed, apply new states
+                {
+                    /* find maximum state */
+                    int localMaxVersion = getMaxEndpointStateVersion(localEpStatePtr);
+                    int remoteMaxVersion = getMaxEndpointStateVersion(remoteState);
+                    if ( remoteMaxVersion > localMaxVersion )
+                    {
+                        // apply states, but do not notify since there is no major change
+                        applyNewStates(ep, localEpStatePtr, remoteState);
+                    }
+                    else if (logger.isTraceEnabled())
+                            logger.trace("Ignoring remote version " + remoteMaxVersion + " <= " + localMaxVersion + " for " + ep);
+                    if (!localEpStatePtr.isAlive() && !isDeadState(localEpStatePtr)) // unless of course, it was dead
+                        markAlive(ep, localEpStatePtr);
+                }
+                else
+                {
+                    if (logger.isTraceEnabled())
+                        logger.trace("Ignoring remote generation " + remoteGeneration + " < " + localGeneration);
+                }
+            }
+            else
+            {
+                // this is a new node, report it to the FD in case it is the first time we are seeing it AND it's not alive
+                FailureDetector.instance.report(ep);
+                handleMajorStateChange(ep, remoteState);
+            }
+        }
+    }
+
+    private void applyNewStates(InetAddress addr, EndpointState localState, EndpointState remoteState)
+    {
+        // don't assert here, since if the node restarts the version will go back to zero
+        int oldVersion = localState.getHeartBeatState().getHeartBeatVersion();
+
+        localState.setHeartBeatState(remoteState.getHeartBeatState());
+        if (logger.isTraceEnabled())
+            logger.trace("Updating heartbeat state version to " + localState.getHeartBeatState().getHeartBeatVersion() + " from " + oldVersion + " for " + addr + " ...");
+
+        // we need to make two loops here, one to apply, then another to notify, this way all states in an update are present and current when the notifications are received
+        for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteState.getApplicationStateMap().entrySet())
+        {
+            ApplicationState remoteKey = remoteEntry.getKey();
+            VersionedValue remoteValue = remoteEntry.getValue();
+
+            assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration();
+            localState.addApplicationState(remoteKey, remoteValue);
+        }
+        for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteState.getApplicationStateMap().entrySet())
+        {
+            doNotifications(addr, remoteEntry.getKey(), remoteEntry.getValue());
+        }
+    }
+
+    // notify that an application state has changed
+    private void doNotifications(InetAddress addr, ApplicationState state, VersionedValue value)
+    {
+        for (IEndpointStateChangeSubscriber subscriber : subscribers)
+        {
+            subscriber.onChange(addr, state, value);
+        }
+    }
+
+    /* Request all the state for the endpoint in the gDigest */
+    private void requestAll(GossipDigest gDigest, List<GossipDigest> deltaGossipDigestList, int remoteGeneration)
+    {
+        /* We are here since we have no data for this endpoint locally so request everthing. */
+        deltaGossipDigestList.add( new GossipDigest(gDigest.getEndpoint(), remoteGeneration, 0) );
+        if (logger.isTraceEnabled())
+            logger.trace("requestAll for " + gDigest.getEndpoint());
+    }
+
+    /* Send all the data with version greater than maxRemoteVersion */
+    private void sendAll(GossipDigest gDigest, Map<InetAddress, EndpointState> deltaEpStateMap, int maxRemoteVersion)
+    {
+        EndpointState localEpStatePtr = getStateForVersionBiggerThan(gDigest.getEndpoint(), maxRemoteVersion) ;
+        if ( localEpStatePtr != null )
+            deltaEpStateMap.put(gDigest.getEndpoint(), localEpStatePtr);
+    }
+
+    /*
+        This method is used to figure the state that the Gossiper has but Gossipee doesn't. The delta digests
+        and the delta state are built up.
+    */
+    void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> deltaGossipDigestList, Map<InetAddress, EndpointState> deltaEpStateMap)
+    {
+        if (gDigestList.size() == 0)
+        {
+           /* we've been sent a *completely* empty syn, which should normally never happen since an endpoint will at least send a syn with itself.
+              If this is happening then the node is attempting shadow gossip, and we should reply with everything we know.
+            */
+            logger.debug("Shadow request received, adding all states");
+            for (Map.Entry<InetAddress, EndpointState> entry : endpointStateMap.entrySet())
+            {
+                gDigestList.add(new GossipDigest(entry.getKey(), 0, 0));
+            }
+        }
+        for ( GossipDigest gDigest : gDigestList )
+        {
+            int remoteGeneration = gDigest.getGeneration();
+            int maxRemoteVersion = gDigest.getMaxVersion();
+            /* Get state associated with the end point in digest */
+            EndpointState epStatePtr = endpointStateMap.get(gDigest.getEndpoint());
+            /*
+                Here we need to fire a GossipDigestAckMessage. If we have some data associated with this endpoint locally
+                then we follow the "if" path of the logic. If we have absolutely nothing for this endpoint we need to
+                request all the data for this endpoint.
+            */
+            if ( epStatePtr != null )
+            {
+                int localGeneration = epStatePtr.getHeartBeatState().getGeneration();
+                /* get the max version of all keys in the state associated with this endpoint */
+                int maxLocalVersion = getMaxEndpointStateVersion(epStatePtr);
+                if ( remoteGeneration == localGeneration && maxRemoteVersion == maxLocalVersion )
+                    continue;
+
+                if ( remoteGeneration > localGeneration )
+                {
+                    /* we request everything from the gossiper */
+                    requestAll(gDigest, deltaGossipDigestList, remoteGeneration);
+                }
+                else if ( remoteGeneration < localGeneration )
+                {
+                    /* send all data with generation = localgeneration and version > 0 */
+                    sendAll(gDigest, deltaEpStateMap, 0);
+                }
+                else if ( remoteGeneration == localGeneration )
+                {
+                    /*
+                        If the max remote version is greater then we request the remote endpoint send us all the data
+                        for this endpoint with version greater than the max version number we have locally for this
+                        endpoint.
+                        If the max remote version is lesser, then we send all the data we have locally for this endpoint
+                        with version greater than the max remote version.
+                    */
+                    if ( maxRemoteVersion > maxLocalVersion )
+                    {
+                        deltaGossipDigestList.add( new GossipDigest(gDigest.getEndpoint(), remoteGeneration, maxLocalVersion) );
+                    }
+                    else if ( maxRemoteVersion < maxLocalVersion )
+                    {
+                        /* send all data with generation = localgeneration and version > maxRemoteVersion */
+                        sendAll(gDigest, deltaEpStateMap, maxRemoteVersion);
+                    }
+                }
+            }
+            else
+            {
+                /* We are here since we have no data for this endpoint locally so request everything. */
+                requestAll(gDigest, deltaGossipDigestList, remoteGeneration);
+            }
+        }
+    }
+
+    public void start(int generationNumber)
+    {
+        start(generationNumber, new HashMap<ApplicationState, VersionedValue>());
+    }
+
+    /**
+     * Start the gossiper with the generation number, preloading the map of application states before starting
+     */
+    public void start(int generationNbr, Map<ApplicationState, VersionedValue> preloadLocalStates)
+    {
+        buildSeedsList();
+        /* initialize the heartbeat state for this localEndpoint */
+        maybeInitializeLocalState(generationNbr);
+        EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress());
+        for (Map.Entry<ApplicationState, VersionedValue> entry : preloadLocalStates.entrySet())
+            localState.addApplicationState(entry.getKey(), entry.getValue());
+
+        //notify snitches that Gossiper is about to start
+        DatabaseDescriptor.getEndpointSnitch().gossiperStarting();
+        if (logger.isTraceEnabled())
+            logger.trace("gossip started with generation " + localState.getHeartBeatState().getGeneration());
+
+        scheduledGossipTask = executor.scheduleWithFixedDelay(new GossipTask(),
+                                                              Gossiper.intervalInMillis,
+                                                              Gossiper.intervalInMillis,
+                                                              TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     *  Do a single 'shadow' round of gossip, where we do not modify any state
+     *  Only used when replacing a node, to get and assume its states
+     */
+    public void doShadowRound()
+    {
+        buildSeedsList();
+        // send a completely empty syn
+        List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
+        GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
+                DatabaseDescriptor.getPartitionerName(),
+                gDigests);
+        MessageOut<GossipDigestSyn> message = new MessageOut<GossipDigestSyn>(MessagingService.Verb.GOSSIP_DIGEST_SYN,
+                digestSynMessage,
+                GossipDigestSyn.serializer);
+        inShadowRound = true;
+        for (InetAddress seed : seeds)
+            MessagingService.instance().sendOneWay(message, seed);
+        int slept = 0;
+        try
+        {
+            while (true)
+            {
+                Thread.sleep(1000);
+                if (!inShadowRound)
+                    break;
+                slept += 1000;
+                if (slept > StorageService.RING_DELAY)
+                    throw new RuntimeException("Unable to gossip with any seeds");
+            }
+        }
+        catch (InterruptedException wtf)
+        {
+            throw new RuntimeException(wtf);
+        }
+    }
+
+    private void buildSeedsList()
+    {
+        for (InetAddress seed : DatabaseDescriptor.getSeeds())
+        {
+            if (seed.equals(FBUtilities.getBroadcastAddress()))
+                continue;
+            seeds.add(seed);
+        }
+    }
+
+    // initialize local HB state if needed, i.e., if gossiper has never been started before.
+    public void maybeInitializeLocalState(int generationNbr)
+    {
+        HeartBeatState hbState = new HeartBeatState(generationNbr);
+        EndpointState localState = new EndpointState(hbState);
+        localState.markAlive();
+        endpointStateMap.putIfAbsent(FBUtilities.getBroadcastAddress(), localState);
+    }
+
+
+    /**
+     * Add an endpoint we knew about previously, but whose state is unknown
+     */
+    public void addSavedEndpoint(InetAddress ep)
+    {
+        if (ep.equals(FBUtilities.getBroadcastAddress()))
+        {
+            logger.debug("Attempt to add self as saved endpoint");
+            return;
+        }
+
+        //preserve any previously known, in-memory data about the endpoint (such as DC, RACK, and so on)
+        EndpointState epState = endpointStateMap.get(ep);
+        if (epState != null)
+        {
+            logger.debug("not replacing a previous epState for {}, but reusing it: {}", ep, epState);
+            epState.setHeartBeatState(new HeartBeatState(0));
+        }
+        else
+        {
+            epState = new EndpointState(new HeartBeatState(0));
+        }
+
+        epState.markDead();
+        endpointStateMap.put(ep, epState);
+        unreachableEndpoints.put(ep, System.currentTimeMillis());
+        if (logger.isTraceEnabled())
+            logger.trace("Adding saved endpoint " + ep + " " + epState.getHeartBeatState().getGeneration());
+    }
+
+    public void addLocalApplicationState(ApplicationState state, VersionedValue value)
+    {
+        EndpointState epState = endpointStateMap.get(FBUtilities.getBroadcastAddress());
+        assert epState != null;
+        epState.addApplicationState(state, value);
+        doNotifications(FBUtilities.getBroadcastAddress(), state, value);
+    }
+
+    public void stop()
+    {
+        scheduledGossipTask.cancel(false);
+        logger.info("Announcing shutdown");
+        try
+        {
+            Thread.sleep(intervalInMillis * 2);
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+        MessageOut message = new MessageOut(MessagingService.Verb.GOSSIP_SHUTDOWN);
+        for (InetAddress ep : liveEndpoints)
+            MessagingService.instance().sendOneWay(message, ep);
+    }
+
+    public boolean isEnabled()
+    {
+        return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
+    }
+
+    protected void finishShadowRound()
+    {
+        if (inShadowRound)
+            inShadowRound = false;
+    }
+
+    protected boolean isInShadowRound()
+    {
+        return inShadowRound;
+    }
+
+    @VisibleForTesting
+    public void initializeNodeUnsafe(InetAddress addr, UUID uuid, int generationNbr)
+    {
+        HeartBeatState hbState = new HeartBeatState(generationNbr);
+        EndpointState newState = new EndpointState(hbState);
+        newState.markAlive();
+        EndpointState oldState = endpointStateMap.putIfAbsent(addr, newState);
+        EndpointState localState = oldState == null ? newState : oldState;
+
+        // always add the version state
+        localState.addApplicationState(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion());
+        localState.addApplicationState(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(uuid));
+    }
+
+    @VisibleForTesting
+    public void injectApplicationState(InetAddress endpoint, ApplicationState state, VersionedValue value)
+    {
+        EndpointState localState = endpointStateMap.get(endpoint);
+        localState.addApplicationState(state, value);
+    }
+
+    public long getEndpointDowntime(String address) throws UnknownHostException
+    {
+        return getEndpointDowntime(InetAddress.getByName(address));
+    }
+
+    public int getCurrentGenerationNumber(String address) throws UnknownHostException
+    {
+        return getCurrentGenerationNumber(InetAddress.getByName(address));
+    }
+
+    public void addExpireTimeForEndpoint(InetAddress endpoint, long expireTime)
+    {
+        if (logger.isDebugEnabled())
+        {
+            logger.debug("adding expire time for endpoint : " + endpoint + " (" + expireTime + ")");
+        }
+        expireTimeEndpointMap.put(endpoint, expireTime);
+    }
+
+    public static long computeExpireTime() {
+        return System.currentTimeMillis() + Gossiper.aVeryLongTime;
+    }
+
+}