You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2015/11/11 21:08:56 UTC

[10/22] cassandra git commit: 10089 - 2.2 patch

10089 - 2.2 patch


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6bb6bb00
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6bb6bb00
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6bb6bb00

Branch: refs/heads/cassandra-3.0
Commit: 6bb6bb005197c33fa94026d472ff78d4f36613cc
Parents: 87fe1e0
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Wed Nov 11 15:04:25 2015 -0500
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Wed Nov 11 15:04:25 2015 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/gms/EndpointState.java |  76 ++++++---
 .../apache/cassandra/gms/FailureDetector.java   |   7 +-
 src/java/org/apache/cassandra/gms/Gossiper.java |  47 +++---
 .../apache/cassandra/gms/VersionedValue.java    |   5 +
 .../cassandra/service/StorageService.java       |  65 ++++----
 .../apache/cassandra/gms/EndpointStateTest.java | 159 +++++++++++++++++++
 .../cassandra/locator/CloudstackSnitchTest.java |   4 +-
 .../apache/cassandra/locator/EC2SnitchTest.java |   4 +-
 .../locator/GoogleCloudSnitchTest.java          |   4 +-
 9 files changed, 283 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bb6bb00/src/java/org/apache/cassandra/gms/EndpointState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java
index 0e6985a..931da8d 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -18,7 +18,11 @@
 package org.apache.cassandra.gms;
 
 import java.io.*;
+import java.util.Collections;
+import java.util.EnumMap;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,8 +31,6 @@ import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
 /**
  * This abstraction represents both the HeartBeatState and the ApplicationState in an EndpointState
  * instance. Any state for a given endpoint can be retrieved from this instance.
@@ -42,7 +44,7 @@ public class EndpointState
     public final static IVersionedSerializer<EndpointState> serializer = new EndpointStateSerializer();
 
     private volatile HeartBeatState hbState;
-    final Map<ApplicationState, VersionedValue> applicationState = new NonBlockingHashMap<ApplicationState, VersionedValue>();
+    private final AtomicReference<Map<ApplicationState, VersionedValue>> applicationState;
 
     /* fields below do not get serialized */
     private volatile long updateTimestamp;
@@ -50,7 +52,13 @@ public class EndpointState
 
     EndpointState(HeartBeatState initialHbState)
     {
+        this(initialHbState, new EnumMap<ApplicationState, VersionedValue>(ApplicationState.class));
+    }
+
+    EndpointState(HeartBeatState initialHbState, Map<ApplicationState, VersionedValue> states)
+    {
         hbState = initialHbState;
+        applicationState = new AtomicReference<Map<ApplicationState, VersionedValue>>(new EnumMap<>(states));
         updateTimestamp = System.nanoTime();
         isAlive = true;
     }
@@ -68,21 +76,37 @@ public class EndpointState
 
     public VersionedValue getApplicationState(ApplicationState key)
     {
-        return applicationState.get(key);
+        return applicationState.get().get(key);
     }
 
-    /**
-     * TODO replace this with operations that don't expose private state
-     */
-    @Deprecated
-    public Map<ApplicationState, VersionedValue> getApplicationStateMap()
+    public Set<Map.Entry<ApplicationState, VersionedValue>> states()
+    {
+        return applicationState.get().entrySet();
+    }
+
+    public void addApplicationState(ApplicationState key, VersionedValue value)
     {
-        return applicationState;
+        addApplicationStates(Collections.singletonMap(key, value));
     }
 
-    void addApplicationState(ApplicationState key, VersionedValue value)
+    public void addApplicationStates(Map<ApplicationState, VersionedValue> values)
     {
-        applicationState.put(key, value);
+        addApplicationStates(values.entrySet());
+    }
+
+    public void addApplicationStates(Set<Map.Entry<ApplicationState, VersionedValue>> values)
+    {
+        while (true)
+        {
+            Map<ApplicationState, VersionedValue> orig = applicationState.get();
+            Map<ApplicationState, VersionedValue> copy = new EnumMap<>(orig);
+
+            for (Map.Entry<ApplicationState, VersionedValue> value : values)
+                copy.put(value.getKey(), value.getValue());
+
+            if (applicationState.compareAndSet(orig, copy))
+                return;
+        }
     }
 
     /* getters and setters */
@@ -133,7 +157,7 @@ public class EndpointState
 
     public String toString()
     {
-        return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState;
+        return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState.get();
     }
 }
 
@@ -146,12 +170,12 @@ class EndpointStateSerializer implements IVersionedSerializer<EndpointState>
         HeartBeatState.serializer.serialize(hbState, out, version);
 
         /* serialize the map of ApplicationState objects */
-        int size = epState.applicationState.size();
-        out.writeInt(size);
-        for (Map.Entry<ApplicationState, VersionedValue> entry : epState.applicationState.entrySet())
+        Set<Map.Entry<ApplicationState, VersionedValue>> states = epState.states();
+        out.writeInt(states.size());
+        for (Map.Entry<ApplicationState, VersionedValue> state : states)
         {
-            VersionedValue value = entry.getValue();
-            out.writeInt(entry.getKey().ordinal());
+            VersionedValue value = state.getValue();
+            out.writeInt(state.getKey().ordinal());
             VersionedValue.serializer.serialize(value, out, version);
         }
     }
@@ -159,26 +183,28 @@ class EndpointStateSerializer implements IVersionedSerializer<EndpointState>
     public EndpointState deserialize(DataInput in, int version) throws IOException
     {
         HeartBeatState hbState = HeartBeatState.serializer.deserialize(in, version);
-        EndpointState epState = new EndpointState(hbState);
 
         int appStateSize = in.readInt();
+        Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class);
         for (int i = 0; i < appStateSize; ++i)
         {
             int key = in.readInt();
             VersionedValue value = VersionedValue.serializer.deserialize(in, version);
-            epState.addApplicationState(Gossiper.STATES[key], value);
+            states.put(Gossiper.STATES[key], value);
         }
-        return epState;
+
+        return new EndpointState(hbState, states);
     }
 
     public long serializedSize(EndpointState epState, int version)
     {
         long size = HeartBeatState.serializer.serializedSize(epState.getHeartBeatState(), version);
-        size += TypeSizes.NATIVE.sizeof(epState.applicationState.size());
-        for (Map.Entry<ApplicationState, VersionedValue> entry : epState.applicationState.entrySet())
+        Set<Map.Entry<ApplicationState, VersionedValue>> states = epState.states();
+        size += TypeSizes.NATIVE.sizeof(states.size());
+        for (Map.Entry<ApplicationState, VersionedValue> state : states)
         {
-            VersionedValue value = entry.getValue();
-            size += TypeSizes.NATIVE.sizeof(entry.getKey().ordinal());
+            VersionedValue value = state.getValue();
+            size += TypeSizes.NATIVE.sizeof(state.getKey().ordinal());
             size += VersionedValue.serializer.serializedSize(value, version);
         }
         return size;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bb6bb00/src/java/org/apache/cassandra/gms/FailureDetector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java
index c563872..a0754b1 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -192,15 +192,16 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean
     {
         sb.append("  generation:").append(endpointState.getHeartBeatState().getGeneration()).append("\n");
         sb.append("  heartbeat:").append(endpointState.getHeartBeatState().getHeartBeatVersion()).append("\n");
-        for (Map.Entry<ApplicationState, VersionedValue> state : endpointState.applicationState.entrySet())
+        for (Map.Entry<ApplicationState, VersionedValue> state : endpointState.states())
         {
             if (state.getKey() == ApplicationState.TOKENS)
                 continue;
             sb.append("  ").append(state.getKey()).append(":").append(state.getValue().version).append(":").append(state.getValue().value).append("\n");
         }
-        if (endpointState.applicationState.containsKey(ApplicationState.TOKENS))
+        VersionedValue tokens = endpointState.getApplicationState(ApplicationState.TOKENS);
+        if (tokens != null)
         {
-            sb.append("  TOKENS:").append(endpointState.applicationState.get(ApplicationState.TOKENS).version).append(":<hidden>\n");
+            sb.append("  TOKENS:").append(tokens.version).append(":<hidden>\n");
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bb6bb00/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index f78dc7a..86fdab2 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -224,7 +224,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
                 return true;
             try
             {
-                if (entry.getValue().getApplicationStateMap().containsKey(ApplicationState.INTERNAL_IP) && seeds.contains(InetAddress.getByName(entry.getValue().getApplicationState(ApplicationState.INTERNAL_IP).value)))
+                VersionedValue internalIp = entry.getValue().getApplicationState(ApplicationState.INTERNAL_IP);
+                if (internalIp != null && seeds.contains(InetAddress.getByName(internalIp.value)))
                     return true;
             }
             catch (UnknownHostException e)
@@ -371,8 +372,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     int getMaxEndpointStateVersion(EndpointState epState)
     {
         int maxVersion = epState.getHeartBeatState().getHeartBeatVersion();
-        for (VersionedValue value : epState.getApplicationStateMap().values())
-            maxVersion = Math.max(maxVersion, value.version);
+        for (Map.Entry<ApplicationState, VersionedValue> state : epState.states())
+            maxVersion = Math.max(maxVersion, state.getValue().version);
         return maxVersion;
     }
 
@@ -525,8 +526,10 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         logger.info("Advertising removal for {}", endpoint);
         epState.updateTimestamp(); // make sure we don't evict it too soon
         epState.getHeartBeatState().forceNewerGenerationUnsafe();
-        epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removingNonlocal(hostId));
-        epState.addApplicationState(ApplicationState.REMOVAL_COORDINATOR, StorageService.instance.valueFactory.removalCoordinator(localHostId));
+        Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class);
+        states.put(ApplicationState.STATUS, StorageService.instance.valueFactory.removingNonlocal(hostId));
+        states.put(ApplicationState.REMOVAL_COORDINATOR, StorageService.instance.valueFactory.removalCoordinator(localHostId));
+        epState.addApplicationStates(states);
         endpointStateMap.put(endpoint, epState);
     }
 
@@ -853,7 +856,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
                     logger.trace("local heartbeat version {} greater than {} for {}", localHbVersion, version, forEndpoint);
             }
             /* Accumulate all application states whose versions are greater than "version" variable */
-            for (Entry<ApplicationState, VersionedValue> entry : epState.getApplicationStateMap().entrySet())
+            Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class);
+            for (Entry<ApplicationState, VersionedValue> entry : epState.states())
             {
                 VersionedValue value = entry.getValue();
                 if (value.version > version)
@@ -865,9 +869,11 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
                     final ApplicationState key = entry.getKey();
                     if (logger.isTraceEnabled())
                         logger.trace("Adding state {}: {}" , key, value.value);
-                    reqdEndpointState.addApplicationState(key, value);
+
+                    states.put(key, value);
                 }
             }
+            reqdEndpointState.addApplicationStates(states);
         }
         return reqdEndpointState;
     }
@@ -1147,19 +1153,13 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         localState.setHeartBeatState(remoteState.getHeartBeatState());
         if (logger.isTraceEnabled())
             logger.trace("Updating heartbeat state version to {} from {} for {} ...", localState.getHeartBeatState().getHeartBeatVersion(), oldVersion, addr);
-        // we need to make two loops here, one to apply, then another to notify, this way all states in an update are present and current when the notifications are received
-        for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteState.getApplicationStateMap().entrySet())
-        {
-            ApplicationState remoteKey = remoteEntry.getKey();
-            VersionedValue remoteValue = remoteEntry.getValue();
 
-            assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration();
-            localState.addApplicationState(remoteKey, remoteValue);
-        }
-        for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteState.getApplicationStateMap().entrySet())
-        {
+        Set<Entry<ApplicationState, VersionedValue>> remoteStates = remoteState.states();
+        assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration();
+        localState.addApplicationStates(remoteStates);
+
+        for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteStates)
             doOnChangeNotifications(addr, remoteEntry.getKey(), remoteEntry.getValue());
-        }
     }
     
     // notify that a local application state is going to change (doesn't get triggered for remote changes)
@@ -1273,7 +1273,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 
     public void start(int generationNumber)
     {
-        start(generationNumber, new HashMap<ApplicationState, VersionedValue>());
+        start(generationNumber, new EnumMap<ApplicationState, VersionedValue>(ApplicationState.class));
     }
 
     /**
@@ -1285,8 +1285,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         /* initialize the heartbeat state for this localEndpoint */
         maybeInitializeLocalState(generationNbr);
         EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress());
-        for (Map.Entry<ApplicationState, VersionedValue> entry : preloadLocalStates.entrySet())
-            localState.addApplicationState(entry.getKey(), entry.getValue());
+        localState.addApplicationStates(preloadLocalStates);
 
         //notify snitches that Gossiper is about to start
         DatabaseDescriptor.getEndpointSnitch().gossiperStarting();
@@ -1475,8 +1474,10 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         EndpointState localState = oldState == null ? newState : oldState;
 
         // always add the version state
-        localState.addApplicationState(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion());
-        localState.addApplicationState(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(uuid));
+        Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class);
+        states.put(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion());
+        states.put(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(uuid));
+        localState.addApplicationStates(states);
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bb6bb00/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java
index a142f41..3ea7bb4 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -109,6 +109,11 @@ public class VersionedValue implements Comparable<VersionedValue>
         return "Value(" + value + "," + version + ")";
     }
 
+    public byte[] toBytes()
+    {
+        return value.getBytes(ISO_8859_1);
+    }
+
     private static String versionString(String... args)
     {
         return StringUtils.join(args, VersionedValue.DELIMITER);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bb6bb00/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index ad209fc..3ea261e 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -31,6 +31,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -521,9 +522,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         hostId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
         try
         {
-            if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS) == null)
+            VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
+            if (tokensVersionedValue == null)
                 throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
-            Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(DatabaseDescriptor.getReplaceAddress(), ApplicationState.TOKENS))));
+            Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
 
             SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc
             Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
@@ -740,7 +742,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     {
         if (!joined)
         {
-            Map<ApplicationState, VersionedValue> appStates = new HashMap<>();
+            Map<ApplicationState, VersionedValue> appStates = new EnumMap<>(ApplicationState.class);
 
             if (replacing && !(Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true"))))
                 throw new ConfigurationException("Cannot set both join_ring=false and attempt to replace a node");
@@ -1655,8 +1657,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                     handleStateBootstrap(endpoint);
                     break;
                 case VersionedValue.STATUS_NORMAL:
+                    handleStateNormal(endpoint, VersionedValue.STATUS_NORMAL);
+                    break;
                 case VersionedValue.SHUTDOWN:
-                    handleStateNormal(endpoint);
+                    handleStateNormal(endpoint, VersionedValue.SHUTDOWN);
                     break;
                 case VersionedValue.REMOVING_TOKEN:
                 case VersionedValue.REMOVED_TOKEN:
@@ -1738,7 +1742,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     private void updatePeerInfo(InetAddress endpoint)
     {
         EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
-        for (Map.Entry<ApplicationState, VersionedValue> entry : epState.getApplicationStateMap().entrySet())
+        for (Map.Entry<ApplicationState, VersionedValue> entry : epState.states())
         {
             switch (entry.getKey())
             {
@@ -1771,12 +1775,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
     }
 
-    private byte[] getApplicationStateValue(InetAddress endpoint, ApplicationState appstate)
-    {
-        String vvalue = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(appstate).value;
-        return vvalue.getBytes(ISO_8859_1);
-    }
-
     private void notifyRpcChange(InetAddress endpoint, boolean ready)
     {
         if (ready)
@@ -1846,7 +1844,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     {
         try
         {
-            return TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(endpoint, ApplicationState.TOKENS))));
+            EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+            if (state == null)
+                return Collections.emptyList();
+
+            VersionedValue versionedValue = state.getApplicationState(ApplicationState.TOKENS);
+            if (versionedValue == null)
+                return Collections.emptyList();
+
+            return TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(versionedValue.toBytes())));
         }
         catch (IOException e)
         {
@@ -1895,22 +1901,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      *
      * @param endpoint node
      */
-    private void handleStateNormal(final InetAddress endpoint)
+    private void handleStateNormal(final InetAddress endpoint, final String status)
     {
-        Collection<Token> tokens;
-
-        tokens = getTokensFor(endpoint);
-
+        Collection<Token> tokens = getTokensFor(endpoint);
         Set<Token> tokensToUpdateInMetadata = new HashSet<>();
         Set<Token> tokensToUpdateInSystemKeyspace = new HashSet<>();
         Set<InetAddress> endpointsToRemove = new HashSet<>();
 
-
         if (logger.isDebugEnabled())
-            logger.debug("Node {} state normal, token {}", endpoint, tokens);
+            logger.debug("Node {} state {}, token {}", endpoint, status, tokens);
 
         if (tokenMetadata.isMember(endpoint))
-            logger.info("Node {} state jump to normal", endpoint);
+            logger.info("Node {} state jump to {}", endpoint, status);
+
+        if (tokens.isEmpty() && status.equals(VersionedValue.STATUS_NORMAL))
+            logger.error("Node {} is in state normal but it has no tokens, state: {}",
+                         endpoint,
+                         Gossiper.instance.getEndpointStateForEndpoint(endpoint));
 
         updatePeerInfo(endpoint);
         // Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300).
@@ -2021,8 +2028,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      */
     private void handleStateLeaving(InetAddress endpoint)
     {
-        Collection<Token> tokens;
-        tokens = getTokensFor(endpoint);
+        Collection<Token> tokens = getTokensFor(endpoint);
 
         if (logger.isDebugEnabled())
             logger.debug("Node {} state leaving, tokens {}", endpoint, tokens);
@@ -2056,16 +2062,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     private void handleStateLeft(InetAddress endpoint, String[] pieces)
     {
         assert pieces.length >= 2;
-        Collection<Token> tokens = null;
-        try
-        {
-            tokens = getTokensFor(endpoint);
-        }
-        catch (Throwable th)
-        {
-            JVMStabilityInspector.inspectThrowable(th);
-            logger.warn("Unable to calculate tokens for {}.", endpoint);
-        }
+        Collection<Token> tokens = getTokensFor(endpoint);
 
         if (logger.isDebugEnabled())
             logger.debug("Node {} state left, tokens {}", endpoint, tokens);
@@ -2154,7 +2151,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         HintedHandOffManager.instance.deleteHintsForEndpoint(endpoint);
         removeEndpoint(endpoint);
         tokenMetadata.removeEndpoint(endpoint);
-        if (tokens != null)
+        if (!tokens.isEmpty())
             tokenMetadata.removeBootstrapTokens(tokens);
 
         notifyLeft(endpoint);
@@ -2358,7 +2355,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public void onJoin(InetAddress endpoint, EndpointState epState)
     {
-        for (Map.Entry<ApplicationState, VersionedValue> entry : epState.getApplicationStateMap().entrySet())
+        for (Map.Entry<ApplicationState, VersionedValue> entry : epState.states())
         {
             onChange(endpoint, entry.getKey(), entry.getValue());
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bb6bb00/test/unit/org/apache/cassandra/gms/EndpointStateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/EndpointStateTest.java b/test/unit/org/apache/cassandra/gms/EndpointStateTest.java
new file mode 100644
index 0000000..b06c435
--- /dev/null
+++ b/test/unit/org/apache/cassandra/gms/EndpointStateTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Token;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class EndpointStateTest
+{
+    public volatile VersionedValue.VersionedValueFactory valueFactory =
+        new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner());
+
+    @Test
+    public void testMultiThreadedReadConsistency() throws InterruptedException
+    {
+        for (int i = 0; i < 500; i++)
+            innerTestMultiThreadedReadConsistency();
+    }
+
+    /**
+     * Test that a thread reading values whilst they are updated by another thread will
+     * not see an entry unless it sees the entry previously added as well, even though
+     * we are accessing the map via an iterator backed by the underlying map. This
+     * works because EndpointState copies the map each time values are added.
+     */
+    private void innerTestMultiThreadedReadConsistency() throws InterruptedException
+    {
+        final Token token = DatabaseDescriptor.getPartitioner().getRandomToken();
+        final List<Token> tokens = Collections.singletonList(token);
+        final HeartBeatState hb = new HeartBeatState(0);
+        final EndpointState state = new EndpointState(hb);
+        final AtomicInteger numFailures = new AtomicInteger();
+
+        Thread t1 = new Thread(new Runnable()
+        {
+            public void run()
+            {
+                state.addApplicationState(ApplicationState.TOKENS, valueFactory.tokens(tokens));
+                state.addApplicationState(ApplicationState.STATUS, valueFactory.normal(tokens));
+            }
+        });
+
+        Thread t2 = new Thread(new Runnable()
+        {
+            public void run()
+            {
+                for (int i = 0; i < 50; i++)
+                {
+                    Map<ApplicationState, VersionedValue> values = new EnumMap<>(ApplicationState.class);
+                    for (Map.Entry<ApplicationState, VersionedValue> entry : state.states())
+                        values.put(entry.getKey(), entry.getValue());
+
+                    if (values.containsKey(ApplicationState.STATUS) && !values.containsKey(ApplicationState.TOKENS))
+                    {
+                        numFailures.incrementAndGet();
+                        System.out.println(String.format("Failed: %s", values));
+                    }
+                }
+            }
+        });
+
+        t1.start();
+        t2.start();
+
+        t1.join();
+        t2.join();
+
+        assertTrue(numFailures.get() == 0);
+    }
+
+    @Test
+    public void testMultiThreadWriteConsistency() throws InterruptedException
+    {
+        for (int i = 0; i < 500; i++)
+            innerTestMultiThreadWriteConsistency();
+    }
+
+    /**
+     * Test that two threads can update the state map concurrently.
+     */
+    private void innerTestMultiThreadWriteConsistency() throws InterruptedException
+    {
+        final Token token = DatabaseDescriptor.getPartitioner().getRandomToken();
+        final List<Token> tokens = Collections.singletonList(token);
+        final String ip = "127.0.0.1";
+        final UUID hostId = UUID.randomUUID();
+        final HeartBeatState hb = new HeartBeatState(0);
+        final EndpointState state = new EndpointState(hb);
+
+        Thread t1 = new Thread(new Runnable()
+        {
+            public void run()
+            {
+                Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class);
+                states.put(ApplicationState.TOKENS, valueFactory.tokens(tokens));
+                states.put(ApplicationState.STATUS, valueFactory.normal(tokens));
+                state.addApplicationStates(states);
+            }
+        });
+
+        Thread t2 = new Thread(new Runnable()
+        {
+            public void run()
+            {
+                Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class);
+                states.put(ApplicationState.INTERNAL_IP, valueFactory.internalIP(ip));
+                states.put(ApplicationState.HOST_ID, valueFactory.hostId(hostId));
+                state.addApplicationStates(states);
+            }
+        });
+
+        t1.start();
+        t2.start();
+
+        t1.join();
+        t2.join();
+
+        Set<Map.Entry<ApplicationState, VersionedValue>> states = state.states();
+        assertEquals(4, states.size());
+
+        Map<ApplicationState, VersionedValue> values = new EnumMap<>(ApplicationState.class);
+        for (Map.Entry<ApplicationState, VersionedValue> entry : states)
+            values.put(entry.getKey(), entry.getValue());
+
+        assertTrue(values.containsKey(ApplicationState.STATUS));
+        assertTrue(values.containsKey(ApplicationState.TOKENS));
+        assertTrue(values.containsKey(ApplicationState.INTERNAL_IP));
+        assertTrue(values.containsKey(ApplicationState.HOST_ID));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bb6bb00/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java b/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java
index d9a4ef1..90e63e0 100644
--- a/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.locator;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.util.EnumMap;
 import java.util.Map;
 
 import org.junit.AfterClass;
@@ -77,9 +78,10 @@ public class CloudstackSnitchTest
         InetAddress nonlocal = InetAddress.getByName("127.0.0.7");
 
         Gossiper.instance.addSavedEndpoint(nonlocal);
-        Map<ApplicationState,VersionedValue> stateMap = Gossiper.instance.getEndpointStateForEndpoint(nonlocal).getApplicationStateMap();
+        Map<ApplicationState, VersionedValue> stateMap = new EnumMap<>(ApplicationState.class);
         stateMap.put(ApplicationState.DC, StorageService.instance.valueFactory.datacenter("ch-zrh"));
         stateMap.put(ApplicationState.RACK, StorageService.instance.valueFactory.rack("2"));
+        Gossiper.instance.getEndpointStateForEndpoint(nonlocal).addApplicationStates(stateMap);
 
         assertEquals("ch-zrh", snitch.getDatacenter(nonlocal));
         assertEquals("2", snitch.getRack(nonlocal));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bb6bb00/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java b/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java
index 6015adf..56bbb77 100644
--- a/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java
@@ -24,6 +24,7 @@ package org.apache.cassandra.locator;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.EnumMap;
 import java.util.Map;
 
 import org.junit.AfterClass;
@@ -79,9 +80,10 @@ public class EC2SnitchTest
         InetAddress nonlocal = InetAddress.getByName("127.0.0.7");
 
         Gossiper.instance.addSavedEndpoint(nonlocal);
-        Map<ApplicationState,VersionedValue> stateMap = Gossiper.instance.getEndpointStateForEndpoint(nonlocal).getApplicationStateMap();
+        Map<ApplicationState, VersionedValue> stateMap = new EnumMap<>(ApplicationState.class);
         stateMap.put(ApplicationState.DC, StorageService.instance.valueFactory.datacenter("us-west"));
         stateMap.put(ApplicationState.RACK, StorageService.instance.valueFactory.datacenter("1a"));
+        Gossiper.instance.getEndpointStateForEndpoint(nonlocal).addApplicationStates(stateMap);
 
         assertEquals("us-west", snitch.getDatacenter(nonlocal));
         assertEquals("1a", snitch.getRack(nonlocal));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bb6bb00/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java b/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java
index 54ea722..1521454 100644
--- a/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java
@@ -23,6 +23,7 @@ package org.apache.cassandra.locator;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.util.EnumMap;
 import java.util.Map;
 
 import org.junit.AfterClass;
@@ -75,9 +76,10 @@ public class GoogleCloudSnitchTest
         InetAddress nonlocal = InetAddress.getByName("127.0.0.7");
 
         Gossiper.instance.addSavedEndpoint(nonlocal);
-        Map<ApplicationState,VersionedValue> stateMap = Gossiper.instance.getEndpointStateForEndpoint(nonlocal).getApplicationStateMap();
+        Map<ApplicationState, VersionedValue> stateMap = new EnumMap<>(ApplicationState.class);
         stateMap.put(ApplicationState.DC, StorageService.instance.valueFactory.datacenter("europe-west1"));
         stateMap.put(ApplicationState.RACK, StorageService.instance.valueFactory.datacenter("a"));
+        Gossiper.instance.getEndpointStateForEndpoint(nonlocal).addApplicationStates(stateMap);
 
         assertEquals("europe-west1", snitch.getDatacenter(nonlocal));
         assertEquals("a", snitch.getRack(nonlocal));