You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2015/11/11 21:09:05 UTC
[19/22] cassandra git commit: 10089 - 3.0 patch
10089 - 3.0 patch
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9a90e989
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9a90e989
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9a90e989
Branch: refs/heads/trunk
Commit: 9a90e9894e9e079058876cf2b16a47d29ba0a32a
Parents: 30eecb2
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Wed Nov 11 15:05:35 2015 -0500
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Wed Nov 11 15:05:35 2015 -0500
----------------------------------------------------------------------
.../org/apache/cassandra/gms/EndpointState.java | 76 ++++++---
.../apache/cassandra/gms/FailureDetector.java | 7 +-
src/java/org/apache/cassandra/gms/Gossiper.java | 47 +++---
.../apache/cassandra/gms/VersionedValue.java | 5 +
.../cassandra/service/StorageService.java | 61 ++++---
.../apache/cassandra/gms/EndpointStateTest.java | 159 +++++++++++++++++++
.../cassandra/locator/CloudstackSnitchTest.java | 4 +-
.../apache/cassandra/locator/EC2SnitchTest.java | 4 +-
.../locator/GoogleCloudSnitchTest.java | 4 +-
9 files changed, 282 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a90e989/src/java/org/apache/cassandra/gms/EndpointState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java
index d1c023a..70f2a68 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -18,7 +18,11 @@
package org.apache.cassandra.gms;
import java.io.*;
+import java.util.Collections;
+import java.util.EnumMap;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,8 +30,6 @@ import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
/**
* This abstraction represents both the HeartBeatState and the ApplicationState in an EndpointState
* instance. Any state for a given endpoint can be retrieved from this instance.
@@ -41,7 +43,7 @@ public class EndpointState
public final static IVersionedSerializer<EndpointState> serializer = new EndpointStateSerializer();
private volatile HeartBeatState hbState;
- final Map<ApplicationState, VersionedValue> applicationState = new NonBlockingHashMap<ApplicationState, VersionedValue>();
+ private final AtomicReference<Map<ApplicationState, VersionedValue>> applicationState;
/* fields below do not get serialized */
private volatile long updateTimestamp;
@@ -49,7 +51,13 @@ public class EndpointState
EndpointState(HeartBeatState initialHbState)
{
+ this(initialHbState, new EnumMap<ApplicationState, VersionedValue>(ApplicationState.class));
+ }
+
+ EndpointState(HeartBeatState initialHbState, Map<ApplicationState, VersionedValue> states)
+ {
hbState = initialHbState;
+ applicationState = new AtomicReference<Map<ApplicationState, VersionedValue>>(new EnumMap<>(states));
updateTimestamp = System.nanoTime();
isAlive = true;
}
@@ -67,21 +75,37 @@ public class EndpointState
public VersionedValue getApplicationState(ApplicationState key)
{
- return applicationState.get(key);
+ return applicationState.get().get(key);
}
- /**
- * TODO replace this with operations that don't expose private state
- */
- @Deprecated
- public Map<ApplicationState, VersionedValue> getApplicationStateMap()
+ public Set<Map.Entry<ApplicationState, VersionedValue>> states()
+ {
+ return applicationState.get().entrySet();
+ }
+
+ public void addApplicationState(ApplicationState key, VersionedValue value)
{
- return applicationState;
+ addApplicationStates(Collections.singletonMap(key, value));
}
- void addApplicationState(ApplicationState key, VersionedValue value)
+ public void addApplicationStates(Map<ApplicationState, VersionedValue> values)
{
- applicationState.put(key, value);
+ addApplicationStates(values.entrySet());
+ }
+
+ public void addApplicationStates(Set<Map.Entry<ApplicationState, VersionedValue>> values)
+ {
+ while (true)
+ {
+ Map<ApplicationState, VersionedValue> orig = applicationState.get();
+ Map<ApplicationState, VersionedValue> copy = new EnumMap<>(orig);
+
+ for (Map.Entry<ApplicationState, VersionedValue> value : values)
+ copy.put(value.getKey(), value.getValue());
+
+ if (applicationState.compareAndSet(orig, copy))
+ return;
+ }
}
/* getters and setters */
@@ -132,7 +156,7 @@ public class EndpointState
public String toString()
{
- return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState;
+ return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState.get();
}
}
@@ -145,12 +169,12 @@ class EndpointStateSerializer implements IVersionedSerializer<EndpointState>
HeartBeatState.serializer.serialize(hbState, out, version);
/* serialize the map of ApplicationState objects */
- int size = epState.applicationState.size();
- out.writeInt(size);
- for (Map.Entry<ApplicationState, VersionedValue> entry : epState.applicationState.entrySet())
+ Set<Map.Entry<ApplicationState, VersionedValue>> states = epState.states();
+ out.writeInt(states.size());
+ for (Map.Entry<ApplicationState, VersionedValue> state : states)
{
- VersionedValue value = entry.getValue();
- out.writeInt(entry.getKey().ordinal());
+ VersionedValue value = state.getValue();
+ out.writeInt(state.getKey().ordinal());
VersionedValue.serializer.serialize(value, out, version);
}
}
@@ -158,26 +182,28 @@ class EndpointStateSerializer implements IVersionedSerializer<EndpointState>
public EndpointState deserialize(DataInputPlus in, int version) throws IOException
{
HeartBeatState hbState = HeartBeatState.serializer.deserialize(in, version);
- EndpointState epState = new EndpointState(hbState);
int appStateSize = in.readInt();
+ Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class);
for (int i = 0; i < appStateSize; ++i)
{
int key = in.readInt();
VersionedValue value = VersionedValue.serializer.deserialize(in, version);
- epState.addApplicationState(Gossiper.STATES[key], value);
+ states.put(Gossiper.STATES[key], value);
}
- return epState;
+
+ return new EndpointState(hbState, states);
}
public long serializedSize(EndpointState epState, int version)
{
long size = HeartBeatState.serializer.serializedSize(epState.getHeartBeatState(), version);
- size += TypeSizes.sizeof(epState.applicationState.size());
- for (Map.Entry<ApplicationState, VersionedValue> entry : epState.applicationState.entrySet())
+ Set<Map.Entry<ApplicationState, VersionedValue>> states = epState.states();
+ size += TypeSizes.sizeof(states.size());
+ for (Map.Entry<ApplicationState, VersionedValue> state : states)
{
- VersionedValue value = entry.getValue();
- size += TypeSizes.sizeof(entry.getKey().ordinal());
+ VersionedValue value = state.getValue();
+ size += TypeSizes.sizeof(state.getKey().ordinal());
size += VersionedValue.serializer.serializedSize(value, version);
}
return size;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a90e989/src/java/org/apache/cassandra/gms/FailureDetector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java
index c563872..a0754b1 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -192,15 +192,16 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean
{
sb.append(" generation:").append(endpointState.getHeartBeatState().getGeneration()).append("\n");
sb.append(" heartbeat:").append(endpointState.getHeartBeatState().getHeartBeatVersion()).append("\n");
- for (Map.Entry<ApplicationState, VersionedValue> state : endpointState.applicationState.entrySet())
+ for (Map.Entry<ApplicationState, VersionedValue> state : endpointState.states())
{
if (state.getKey() == ApplicationState.TOKENS)
continue;
sb.append(" ").append(state.getKey()).append(":").append(state.getValue().version).append(":").append(state.getValue().value).append("\n");
}
- if (endpointState.applicationState.containsKey(ApplicationState.TOKENS))
+ VersionedValue tokens = endpointState.getApplicationState(ApplicationState.TOKENS);
+ if (tokens != null)
{
- sb.append(" TOKENS:").append(endpointState.applicationState.get(ApplicationState.TOKENS).version).append(":<hidden>\n");
+ sb.append(" TOKENS:").append(tokens.version).append(":<hidden>\n");
}
else
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a90e989/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 99c6755..795a22f 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -224,7 +224,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return true;
try
{
- if (entry.getValue().getApplicationStateMap().containsKey(ApplicationState.INTERNAL_IP) && seeds.contains(InetAddress.getByName(entry.getValue().getApplicationState(ApplicationState.INTERNAL_IP).value)))
+ VersionedValue internalIp = entry.getValue().getApplicationState(ApplicationState.INTERNAL_IP);
+ if (internalIp != null && seeds.contains(InetAddress.getByName(internalIp.value)))
return true;
}
catch (UnknownHostException e)
@@ -371,8 +372,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
int getMaxEndpointStateVersion(EndpointState epState)
{
int maxVersion = epState.getHeartBeatState().getHeartBeatVersion();
- for (VersionedValue value : epState.getApplicationStateMap().values())
- maxVersion = Math.max(maxVersion, value.version);
+ for (Map.Entry<ApplicationState, VersionedValue> state : epState.states())
+ maxVersion = Math.max(maxVersion, state.getValue().version);
return maxVersion;
}
@@ -525,8 +526,10 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
logger.info("Advertising removal for {}", endpoint);
epState.updateTimestamp(); // make sure we don't evict it too soon
epState.getHeartBeatState().forceNewerGenerationUnsafe();
- epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removingNonlocal(hostId));
- epState.addApplicationState(ApplicationState.REMOVAL_COORDINATOR, StorageService.instance.valueFactory.removalCoordinator(localHostId));
+ Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class);
+ states.put(ApplicationState.STATUS, StorageService.instance.valueFactory.removingNonlocal(hostId));
+ states.put(ApplicationState.REMOVAL_COORDINATOR, StorageService.instance.valueFactory.removalCoordinator(localHostId));
+ epState.addApplicationStates(states);
endpointStateMap.put(endpoint, epState);
}
@@ -867,7 +870,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
logger.trace("local heartbeat version {} greater than {} for {}", localHbVersion, version, forEndpoint);
}
/* Accumulate all application states whose versions are greater than "version" variable */
- for (Entry<ApplicationState, VersionedValue> entry : epState.getApplicationStateMap().entrySet())
+ Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class);
+ for (Entry<ApplicationState, VersionedValue> entry : epState.states())
{
VersionedValue value = entry.getValue();
if (value.version > version)
@@ -879,9 +883,11 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
final ApplicationState key = entry.getKey();
if (logger.isTraceEnabled())
logger.trace("Adding state {}: {}" , key, value.value);
- reqdEndpointState.addApplicationState(key, value);
+
+ states.put(key, value);
}
}
+ reqdEndpointState.addApplicationStates(states);
}
return reqdEndpointState;
}
@@ -1161,19 +1167,13 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
localState.setHeartBeatState(remoteState.getHeartBeatState());
if (logger.isTraceEnabled())
logger.trace("Updating heartbeat state version to {} from {} for {} ...", localState.getHeartBeatState().getHeartBeatVersion(), oldVersion, addr);
- // we need to make two loops here, one to apply, then another to notify, this way all states in an update are present and current when the notifications are received
- for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteState.getApplicationStateMap().entrySet())
- {
- ApplicationState remoteKey = remoteEntry.getKey();
- VersionedValue remoteValue = remoteEntry.getValue();
- assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration();
- localState.addApplicationState(remoteKey, remoteValue);
- }
- for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteState.getApplicationStateMap().entrySet())
- {
+ Set<Entry<ApplicationState, VersionedValue>> remoteStates = remoteState.states();
+ assert remoteState.getHeartBeatState().getGeneration() == localState.getHeartBeatState().getGeneration();
+ localState.addApplicationStates(remoteStates);
+
+ for (Entry<ApplicationState, VersionedValue> remoteEntry : remoteStates)
doOnChangeNotifications(addr, remoteEntry.getKey(), remoteEntry.getValue());
- }
}
// notify that a local application state is going to change (doesn't get triggered for remote changes)
@@ -1287,7 +1287,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
public void start(int generationNumber)
{
- start(generationNumber, new HashMap<ApplicationState, VersionedValue>());
+ start(generationNumber, new EnumMap<ApplicationState, VersionedValue>(ApplicationState.class));
}
/**
@@ -1299,8 +1299,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
/* initialize the heartbeat state for this localEndpoint */
maybeInitializeLocalState(generationNbr);
EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress());
- for (Map.Entry<ApplicationState, VersionedValue> entry : preloadLocalStates.entrySet())
- localState.addApplicationState(entry.getKey(), entry.getValue());
+ localState.addApplicationStates(preloadLocalStates);
//notify snitches that Gossiper is about to start
DatabaseDescriptor.getEndpointSnitch().gossiperStarting();
@@ -1489,8 +1488,10 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
EndpointState localState = oldState == null ? newState : oldState;
// always add the version state
- localState.addApplicationState(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion());
- localState.addApplicationState(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(uuid));
+ Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class);
+ states.put(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion());
+ states.put(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(uuid));
+ localState.addApplicationStates(states);
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a90e989/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java
index 25f7706..0366320 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -108,6 +108,11 @@ public class VersionedValue implements Comparable<VersionedValue>
return "Value(" + value + "," + version + ")";
}
+ public byte[] toBytes()
+ {
+ return value.getBytes(ISO_8859_1);
+ }
+
private static String versionString(String... args)
{
return StringUtils.join(args, VersionedValue.DELIMITER);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a90e989/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 34df507..16dd045 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -504,12 +504,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
hostId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
try
{
- if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS) == null)
+ VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
+ if (tokensVersionedValue == null)
throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
- Collection<Token> tokens = TokenSerializer.deserialize(
- tokenMetadata.partitioner,
- new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(DatabaseDescriptor.getReplaceAddress(),
- ApplicationState.TOKENS))));
+ Collection<Token> tokens = TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc
Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
@@ -746,7 +744,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
if (!joined)
{
- Map<ApplicationState, VersionedValue> appStates = new HashMap<>();
+ Map<ApplicationState, VersionedValue> appStates = new EnumMap<>(ApplicationState.class);
if (SystemKeyspace.wasDecommissioned())
{
@@ -1676,8 +1674,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
handleStateBootstrap(endpoint);
break;
case VersionedValue.STATUS_NORMAL:
+ handleStateNormal(endpoint, VersionedValue.STATUS_NORMAL);
+ break;
case VersionedValue.SHUTDOWN:
- handleStateNormal(endpoint);
+ handleStateNormal(endpoint, VersionedValue.SHUTDOWN);
break;
case VersionedValue.REMOVING_TOKEN:
case VersionedValue.REMOVED_TOKEN:
@@ -1759,7 +1759,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private void updatePeerInfo(InetAddress endpoint)
{
EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
- for (Map.Entry<ApplicationState, VersionedValue> entry : epState.getApplicationStateMap().entrySet())
+ for (Map.Entry<ApplicationState, VersionedValue> entry : epState.states())
{
switch (entry.getKey())
{
@@ -1792,12 +1792,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
}
- private byte[] getApplicationStateValue(InetAddress endpoint, ApplicationState appstate)
- {
- String vvalue = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(appstate).value;
- return vvalue.getBytes(ISO_8859_1);
- }
-
private void notifyRpcChange(InetAddress endpoint, boolean ready)
{
if (ready)
@@ -1867,9 +1861,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
try
{
- return TokenSerializer.deserialize(
- tokenMetadata.partitioner,
- new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(endpoint, ApplicationState.TOKENS))));
+ EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+ if (state == null)
+ return Collections.emptyList();
+
+ VersionedValue versionedValue = state.getApplicationState(ApplicationState.TOKENS);
+ if (versionedValue == null)
+ return Collections.emptyList();
+
+ return TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new ByteArrayInputStream(versionedValue.toBytes())));
}
catch (IOException e)
{
@@ -1918,22 +1918,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
*
* @param endpoint node
*/
- private void handleStateNormal(final InetAddress endpoint)
+ private void handleStateNormal(final InetAddress endpoint, final String status)
{
- Collection<Token> tokens;
-
- tokens = getTokensFor(endpoint);
-
+ Collection<Token> tokens = getTokensFor(endpoint);
Set<Token> tokensToUpdateInMetadata = new HashSet<>();
Set<Token> tokensToUpdateInSystemKeyspace = new HashSet<>();
Set<InetAddress> endpointsToRemove = new HashSet<>();
-
if (logger.isDebugEnabled())
- logger.debug("Node {} state normal, token {}", endpoint, tokens);
+ logger.debug("Node {} state {}, token {}", endpoint, status, tokens);
if (tokenMetadata.isMember(endpoint))
- logger.info("Node {} state jump to normal", endpoint);
+ logger.info("Node {} state jump to {}", endpoint, status);
+
+ if (tokens.isEmpty() && status.equals(VersionedValue.STATUS_NORMAL))
+ logger.error("Node {} is in state normal but it has no tokens, state: {}",
+ endpoint,
+ Gossiper.instance.getEndpointStateForEndpoint(endpoint));
updatePeerInfo(endpoint);
// Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300).
@@ -2044,8 +2045,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
*/
private void handleStateLeaving(InetAddress endpoint)
{
- Collection<Token> tokens;
- tokens = getTokensFor(endpoint);
+ Collection<Token> tokens = getTokensFor(endpoint);
if (logger.isDebugEnabled())
logger.debug("Node {} state leaving, tokens {}", endpoint, tokens);
@@ -2079,8 +2079,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private void handleStateLeft(InetAddress endpoint, String[] pieces)
{
assert pieces.length >= 2;
- Collection<Token> tokens;
- tokens = getTokensFor(endpoint);
+ Collection<Token> tokens = getTokensFor(endpoint);
if (logger.isDebugEnabled())
logger.debug("Node {} state left, tokens {}", endpoint, tokens);
@@ -2172,7 +2171,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
removeEndpoint(endpoint);
tokenMetadata.removeEndpoint(endpoint);
- if (tokens != null)
+ if (!tokens.isEmpty())
tokenMetadata.removeBootstrapTokens(tokens);
notifyLeft(endpoint);
PendingRangeCalculatorService.instance.update();
@@ -2375,7 +2374,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public void onJoin(InetAddress endpoint, EndpointState epState)
{
- for (Map.Entry<ApplicationState, VersionedValue> entry : epState.getApplicationStateMap().entrySet())
+ for (Map.Entry<ApplicationState, VersionedValue> entry : epState.states())
{
onChange(endpoint, entry.getKey(), entry.getValue());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a90e989/test/unit/org/apache/cassandra/gms/EndpointStateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/EndpointStateTest.java b/test/unit/org/apache/cassandra/gms/EndpointStateTest.java
new file mode 100644
index 0000000..b06c435
--- /dev/null
+++ b/test/unit/org/apache/cassandra/gms/EndpointStateTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Token;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class EndpointStateTest
+{
+ public volatile VersionedValue.VersionedValueFactory valueFactory =
+ new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner());
+
+ @Test
+ public void testMultiThreadedReadConsistency() throws InterruptedException
+ {
+ for (int i = 0; i < 500; i++)
+ innerTestMultiThreadedReadConsistency();
+ }
+
+ /**
+ * Test that a thread reading values whilst they are updated by another thread will
+ * not see an entry unless it sees the entry previously added as well, even though
+ * we are accessing the map via an iterator backed by the underlying map. This
+ * works because EndpointState copies the map each time values are added.
+ */
+ private void innerTestMultiThreadedReadConsistency() throws InterruptedException
+ {
+ final Token token = DatabaseDescriptor.getPartitioner().getRandomToken();
+ final List<Token> tokens = Collections.singletonList(token);
+ final HeartBeatState hb = new HeartBeatState(0);
+ final EndpointState state = new EndpointState(hb);
+ final AtomicInteger numFailures = new AtomicInteger();
+
+ Thread t1 = new Thread(new Runnable()
+ {
+ public void run()
+ {
+ state.addApplicationState(ApplicationState.TOKENS, valueFactory.tokens(tokens));
+ state.addApplicationState(ApplicationState.STATUS, valueFactory.normal(tokens));
+ }
+ });
+
+ Thread t2 = new Thread(new Runnable()
+ {
+ public void run()
+ {
+ for (int i = 0; i < 50; i++)
+ {
+ Map<ApplicationState, VersionedValue> values = new EnumMap<>(ApplicationState.class);
+ for (Map.Entry<ApplicationState, VersionedValue> entry : state.states())
+ values.put(entry.getKey(), entry.getValue());
+
+ if (values.containsKey(ApplicationState.STATUS) && !values.containsKey(ApplicationState.TOKENS))
+ {
+ numFailures.incrementAndGet();
+ System.out.println(String.format("Failed: %s", values));
+ }
+ }
+ }
+ });
+
+ t1.start();
+ t2.start();
+
+ t1.join();
+ t2.join();
+
+ assertTrue(numFailures.get() == 0);
+ }
+
+ @Test
+ public void testMultiThreadWriteConsistency() throws InterruptedException
+ {
+ for (int i = 0; i < 500; i++)
+ innerTestMultiThreadWriteConsistency();
+ }
+
+ /**
+ * Test that two threads can update the state map concurrently.
+ */
+ private void innerTestMultiThreadWriteConsistency() throws InterruptedException
+ {
+ final Token token = DatabaseDescriptor.getPartitioner().getRandomToken();
+ final List<Token> tokens = Collections.singletonList(token);
+ final String ip = "127.0.0.1";
+ final UUID hostId = UUID.randomUUID();
+ final HeartBeatState hb = new HeartBeatState(0);
+ final EndpointState state = new EndpointState(hb);
+
+ Thread t1 = new Thread(new Runnable()
+ {
+ public void run()
+ {
+ Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class);
+ states.put(ApplicationState.TOKENS, valueFactory.tokens(tokens));
+ states.put(ApplicationState.STATUS, valueFactory.normal(tokens));
+ state.addApplicationStates(states);
+ }
+ });
+
+ Thread t2 = new Thread(new Runnable()
+ {
+ public void run()
+ {
+ Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class);
+ states.put(ApplicationState.INTERNAL_IP, valueFactory.internalIP(ip));
+ states.put(ApplicationState.HOST_ID, valueFactory.hostId(hostId));
+ state.addApplicationStates(states);
+ }
+ });
+
+ t1.start();
+ t2.start();
+
+ t1.join();
+ t2.join();
+
+ Set<Map.Entry<ApplicationState, VersionedValue>> states = state.states();
+ assertEquals(4, states.size());
+
+ Map<ApplicationState, VersionedValue> values = new EnumMap<>(ApplicationState.class);
+ for (Map.Entry<ApplicationState, VersionedValue> entry : states)
+ values.put(entry.getKey(), entry.getValue());
+
+ assertTrue(values.containsKey(ApplicationState.STATUS));
+ assertTrue(values.containsKey(ApplicationState.TOKENS));
+ assertTrue(values.containsKey(ApplicationState.INTERNAL_IP));
+ assertTrue(values.containsKey(ApplicationState.HOST_ID));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a90e989/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java b/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java
index 7881265..5ac1b31 100644
--- a/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/CloudstackSnitchTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.locator;
import java.io.IOException;
import java.net.InetAddress;
+import java.util.EnumMap;
import java.util.Map;
import org.junit.AfterClass;
@@ -78,9 +79,10 @@ public class CloudstackSnitchTest
InetAddress nonlocal = InetAddress.getByName("127.0.0.7");
Gossiper.instance.addSavedEndpoint(nonlocal);
- Map<ApplicationState,VersionedValue> stateMap = Gossiper.instance.getEndpointStateForEndpoint(nonlocal).getApplicationStateMap();
+ Map<ApplicationState, VersionedValue> stateMap = new EnumMap<>(ApplicationState.class);
stateMap.put(ApplicationState.DC, StorageService.instance.valueFactory.datacenter("ch-zrh"));
stateMap.put(ApplicationState.RACK, StorageService.instance.valueFactory.rack("2"));
+ Gossiper.instance.getEndpointStateForEndpoint(nonlocal).addApplicationStates(stateMap);
assertEquals("ch-zrh", snitch.getDatacenter(nonlocal));
assertEquals("2", snitch.getRack(nonlocal));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a90e989/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java b/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java
index cb30dc0..ca6f359 100644
--- a/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java
@@ -22,6 +22,7 @@ package org.apache.cassandra.locator;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.EnumMap;
import java.util.Map;
import org.junit.AfterClass;
@@ -77,9 +78,10 @@ public class EC2SnitchTest
InetAddress nonlocal = InetAddress.getByName("127.0.0.7");
Gossiper.instance.addSavedEndpoint(nonlocal);
- Map<ApplicationState,VersionedValue> stateMap = Gossiper.instance.getEndpointStateForEndpoint(nonlocal).getApplicationStateMap();
+ Map<ApplicationState, VersionedValue> stateMap = new EnumMap<>(ApplicationState.class);
stateMap.put(ApplicationState.DC, StorageService.instance.valueFactory.datacenter("us-west"));
stateMap.put(ApplicationState.RACK, StorageService.instance.valueFactory.datacenter("1a"));
+ Gossiper.instance.getEndpointStateForEndpoint(nonlocal).addApplicationStates(stateMap);
assertEquals("us-west", snitch.getDatacenter(nonlocal));
assertEquals("1a", snitch.getRack(nonlocal));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a90e989/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java b/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java
index fff880d..04b4361 100644
--- a/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/GoogleCloudSnitchTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.locator;
import java.io.IOException;
import java.net.InetAddress;
+import java.util.EnumMap;
import java.util.Map;
import org.junit.AfterClass;
@@ -73,9 +74,10 @@ public class GoogleCloudSnitchTest
InetAddress nonlocal = InetAddress.getByName("127.0.0.7");
Gossiper.instance.addSavedEndpoint(nonlocal);
- Map<ApplicationState,VersionedValue> stateMap = Gossiper.instance.getEndpointStateForEndpoint(nonlocal).getApplicationStateMap();
+ Map<ApplicationState, VersionedValue> stateMap = new EnumMap<>(ApplicationState.class);
stateMap.put(ApplicationState.DC, StorageService.instance.valueFactory.datacenter("europe-west1"));
stateMap.put(ApplicationState.RACK, StorageService.instance.valueFactory.datacenter("a"));
+ Gossiper.instance.getEndpointStateForEndpoint(nonlocal).addApplicationStates(stateMap);
assertEquals("europe-west1", snitch.getDatacenter(nonlocal));
assertEquals("a", snitch.getRack(nonlocal));