You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2019/02/01 11:29:36 UTC
[cassandra] branch cassandra-3.0 updated: Severe concurrency issues
in STCS, DTCS, TWCS, TMD.Topology, TypeParser
This is an automated email from the ASF dual-hosted git repository.
snazy pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new bc18b4d Severe concurrency issues in STCS,DTCS,TWCS,TMD.Topology,TypeParser
bc18b4d is described below
commit bc18b4dd4e33020d0d58c3701077d0af5c39bce6
Author: Robert Stupp <sn...@snazy.de>
AuthorDate: Wed Oct 31 12:48:19 2018 +0100
Severe concurrency issues in STCS,DTCS,TWCS,TMD.Topology,TypeParser
patch by Robert Stupp; reviewed by Blake Eggleston for CASSANDRA-14781
---
.../org/apache/cassandra/db/ColumnFamilyStore.java | 1 -
.../db/compaction/AbstractCompactionStrategy.java | 14 +-
.../compaction/DateTieredCompactionStrategy.java | 12 +-
.../compaction/SizeTieredCompactionStrategy.java | 8 +-
.../compaction/TimeWindowCompactionStrategy.java | 4 +-
.../apache/cassandra/db/marshal/TypeParser.java | 30 +-
.../cassandra/locator/NetworkTopologyStrategy.java | 3 +-
.../apache/cassandra/locator/TokenMetadata.java | 354 ++++++++++++---------
.../cassandra/locator/TokenMetadataTest.java | 23 +-
9 files changed, 269 insertions(+), 180 deletions(-)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 3482909..355d710 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2138,7 +2138,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
assert data.getCompacting().isEmpty() : data.getCompacting();
Iterable<SSTableReader> sstables = getLiveSSTables();
sstables = AbstractCompactionStrategy.filterSuspectSSTables(sstables);
- sstables = ImmutableList.copyOf(sstables);
LifecycleTransaction modifier = data.tryModify(sstables, operationType);
assert modifier != null: "something marked things compacting while compactions are disabled";
return modifier;
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 9f07691..2348d19 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -252,15 +252,15 @@ public abstract class AbstractCompactionStrategy
* @param originalCandidates The collection to check for blacklisted SSTables
* @return list of the SSTables with blacklisted ones filtered out
*/
- public static Iterable<SSTableReader> filterSuspectSSTables(Iterable<SSTableReader> originalCandidates)
+ public static List<SSTableReader> filterSuspectSSTables(Iterable<SSTableReader> originalCandidates)
{
- return Iterables.filter(originalCandidates, new Predicate<SSTableReader>()
+ List<SSTableReader> filtered = new ArrayList<>();
+ for (SSTableReader sstable : originalCandidates)
{
- public boolean apply(SSTableReader sstable)
- {
- return !sstable.isMarkedSuspect();
- }
- });
+ if (!sstable.isMarkedSuspect())
+ filtered.add(sstable);
+ }
+ return filtered;
}
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 3e6ae61..7c38fa8 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -87,7 +87,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
* @param gcBefore
* @return
*/
- private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
+ private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
{
if (Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE)))
return Collections.emptyList();
@@ -193,11 +193,6 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
});
}
- /**
- *
- * @param sstables
- * @return
- */
public static List<Pair<SSTableReader, Long>> createSSTableAndMinTimestampPairs(Iterable<SSTableReader> sstables)
{
List<Pair<SSTableReader, Long>> sstableMinTimestampPairs = Lists.newArrayListWithCapacity(Iterables.size(sstables));
@@ -205,14 +200,15 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
sstableMinTimestampPairs.add(Pair.create(sstable, sstable.getMinTimestamp()));
return sstableMinTimestampPairs;
}
+
@Override
- public void addSSTable(SSTableReader sstable)
+ public synchronized void addSSTable(SSTableReader sstable)
{
sstables.add(sstable);
}
@Override
- public void removeSSTable(SSTableReader sstable)
+ public synchronized void removeSSTable(SSTableReader sstable)
{
sstables.remove(sstable);
}
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index f8a8240..80f5e8c 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -74,7 +74,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
this.sizeTieredOptions = new SizeTieredCompactionStrategyOptions(options);
}
- private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
+ private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
{
// make local copies so they can't be changed out from under us mid-method
int minThreshold = cfs.getMinimumCompactionThreshold();
@@ -190,7 +190,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
}
@SuppressWarnings("resource")
- public Collection<AbstractCompactionTask> getMaximalTask(final int gcBefore, boolean splitOutput)
+ public synchronized Collection<AbstractCompactionTask> getMaximalTask(final int gcBefore, boolean splitOutput)
{
Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables);
if (Iterables.isEmpty(filteredSSTables))
@@ -316,13 +316,13 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
}
@Override
- public void addSSTable(SSTableReader added)
+ public synchronized void addSSTable(SSTableReader added)
{
sstables.add(added);
}
@Override
- public void removeSSTable(SSTableReader sstable)
+ public synchronized void removeSSTable(SSTableReader sstable)
{
sstables.remove(sstable);
}
diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
index 8d26d0c..c44d3aa 100644
--- a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
@@ -169,13 +169,13 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
}
@Override
- public void addSSTable(SSTableReader sstable)
+ public synchronized void addSSTable(SSTableReader sstable)
{
sstables.add(sstable);
}
@Override
- public void removeSSTable(SSTableReader sstable)
+ public synchronized void removeSSTable(SSTableReader sstable)
{
sstables.remove(sstable);
}
diff --git a/src/java/org/apache/cassandra/db/marshal/TypeParser.java b/src/java/org/apache/cassandra/db/marshal/TypeParser.java
index 35d15ab..590eea3 100644
--- a/src/java/org/apache/cassandra/db/marshal/TypeParser.java
+++ b/src/java/org/apache/cassandra/db/marshal/TypeParser.java
@@ -23,6 +23,9 @@ import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.*;
+import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableMap;
+
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -37,7 +40,7 @@ public class TypeParser
private int idx;
// A cache of parsed string, specially useful for DynamicCompositeType
- private static final Map<String, AbstractType<?>> cache = new HashMap<>();
+ private static volatile ImmutableMap<String, AbstractType<?>> cache = ImmutableMap.of();
public static final TypeParser EMPTY_PARSER = new TypeParser("", 0);
@@ -60,6 +63,7 @@ public class TypeParser
if (str == null)
return BytesType.instance;
+ // A single volatile read of 'cache' should not hurt.
AbstractType<?> type = cache.get(str);
if (type != null)
@@ -83,9 +87,27 @@ public class TypeParser
else
type = getAbstractType(name);
- // We don't really care about concurrency here. Worst case scenario, we do some parsing unnecessarily
- cache.put(str, type);
- return type;
+ Verify.verify(type != null, "Parsing %s yielded null, which is a bug", str);
+
+ // Prevent concurrent modification to the map acting as the cache for TypeParser at the expense of
+ // more allocation when the cache needs to be updated, since updates to the cache are rare compared
+ // to the amount of reads.
+ //
+ // Copy the existing cache into a new map and add the parsed AbstractType instance and replace
+ // the cache, if the type is not already in the cache.
+ //
+ // The cache-update is done in a short synchronized block to prevent duplicate instances of AbstractType
+ // for the same string representation.
+ synchronized (TypeParser.class)
+ {
+ if (!cache.containsKey(str))
+ {
+ ImmutableMap.Builder<String, AbstractType<?>> builder = ImmutableMap.builder();
+ builder.putAll(cache).put(str, type);
+ cache = builder.build();
+ }
+ return type;
+ }
}
public static AbstractType<?> parse(CharSequence compareWith) throws SyntaxException, ConfigurationException
diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
index 7c8d95e..82183bb 100644
--- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.TokenMetadata.Topology;
import org.apache.cassandra.utils.FBUtilities;
+import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
/**
@@ -91,7 +92,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
// all endpoints in each DC, so we can check when we have exhausted all the members of a DC
Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints();
// all racks in a DC so we can check when we have exhausted all racks in a DC
- Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
+ Map<String, ImmutableMultimap<String, InetAddress>> racks = topology.getDatacenterRacks();
assert !allEndpoints.isEmpty() && !racks.isEmpty() : "not aware of any cluster members";
// tracks the racks we have already placed replicas in
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index b44a1a1..3978eeb 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -95,9 +95,10 @@ public class TokenMetadata
/* Use this lock for manipulating the token map */
private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
- private volatile ArrayList<Token> sortedTokens;
+ private volatile ArrayList<Token> sortedTokens; // safe to be read without a lock, as it's never mutated
+
+ private volatile Topology topology;
- private final Topology topology;
public final IPartitioner partitioner;
private static final Comparator<InetAddress> inetaddressCmp = new Comparator<InetAddress>()
@@ -115,7 +116,7 @@ public class TokenMetadata
{
this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp),
HashBiMap.<InetAddress, UUID>create(),
- new Topology(),
+ Topology.empty(),
DatabaseDescriptor.getPartitioner());
}
@@ -193,6 +194,7 @@ public class TokenMetadata
try
{
boolean shouldSortTokens = false;
+ Topology.Builder topologyBuilder = topology.unbuild();
for (InetAddress endpoint : endpointTokens.keySet())
{
Collection<Token> tokens = endpointTokens.get(endpoint);
@@ -201,7 +203,7 @@ public class TokenMetadata
bootstrapTokens.removeValue(endpoint);
tokenToEndpointMap.removeValue(endpoint);
- topology.addEndpoint(endpoint);
+ topologyBuilder.addEndpoint(endpoint);
leavingEndpoints.remove(endpoint);
replacementToOriginal.remove(endpoint);
removeFromMoving(endpoint); // also removing this endpoint from moving
@@ -217,6 +219,7 @@ public class TokenMetadata
}
}
}
+ topology = topologyBuilder.build();
if (shouldSortTokens)
sortedTokens = sortTokens();
@@ -381,12 +384,28 @@ public class TokenMetadata
public Optional<InetAddress> getReplacementNode(InetAddress endpoint)
{
- return Optional.ofNullable(replacementToOriginal.inverse().get(endpoint));
+ lock.readLock().lock();
+ try
+ {
+ return Optional.ofNullable(replacementToOriginal.inverse().get(endpoint));
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
}
public Optional<InetAddress> getReplacingNode(InetAddress endpoint)
{
- return Optional.ofNullable((replacementToOriginal.get(endpoint)));
+ lock.readLock().lock();
+ try
+ {
+ return Optional.ofNullable((replacementToOriginal.get(endpoint)));
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
}
public void removeBootstrapTokens(Collection<Token> tokens)
@@ -430,7 +449,6 @@ public class TokenMetadata
assert endpoint != null;
lock.writeLock().lock();
-
try
{
movingEndpoints.add(Pair.create(token, endpoint));
@@ -450,7 +468,7 @@ public class TokenMetadata
{
bootstrapTokens.removeValue(endpoint);
tokenToEndpointMap.removeValue(endpoint);
- topology.removeEndpoint(endpoint);
+ topology = topology.unbuild().removeEndpoint(endpoint).build();
leavingEndpoints.remove(endpoint);
if (replacementToOriginal.remove(endpoint) != null)
{
@@ -469,7 +487,7 @@ public class TokenMetadata
/**
* This is called when the snitch properties for this endpoint are updated, see CASSANDRA-10238.
*/
- public void updateTopology(InetAddress endpoint)
+ public Topology updateTopology(InetAddress endpoint)
{
assert endpoint != null;
@@ -477,8 +495,9 @@ public class TokenMetadata
try
{
logger.info("Updating topology for {}", endpoint);
- topology.updateEndpoint(endpoint);
+ topology = topology.unbuild().updateEndpoint(endpoint).build();
invalidateCachedRings();
+ return topology;
}
finally
{
@@ -490,14 +509,15 @@ public class TokenMetadata
* This is called when the snitch properties for many endpoints are updated, it will update
* the topology mappings of any endpoints whose snitch has changed, see CASSANDRA-10238.
*/
- public void updateTopology()
+ public Topology updateTopology()
{
lock.writeLock().lock();
try
{
logger.info("Updating topology for all endpoints that have changed");
- topology.updateEndpoints();
+ topology = topology.unbuild().updateEndpoints().build();
invalidateCachedRings();
+ return topology;
}
finally
{
@@ -590,7 +610,6 @@ public class TokenMetadata
assert endpoint != null;
lock.readLock().lock();
-
try
{
for (Pair<Token, InetAddress> pair : movingEndpoints)
@@ -620,7 +639,7 @@ public class TokenMetadata
{
return new TokenMetadata(SortedBiMultiValMap.create(tokenToEndpointMap, null, inetaddressCmp),
HashBiMap.create(endpointToHostIdMap),
- new Topology(topology),
+ topology,
partitioner);
}
finally
@@ -690,7 +709,6 @@ public class TokenMetadata
public TokenMetadata cloneAfterAllSettled()
{
lock.readLock().lock();
-
try
{
TokenMetadata metadata = cloneOnlyTokenMap();
@@ -807,50 +825,49 @@ public class TokenMetadata
public void calculatePendingRanges(AbstractReplicationStrategy strategy, String keyspaceName)
{
// avoid race between both branches - do not use a lock here as this will block any other unrelated operations!
+ long startedAt = System.currentTimeMillis();
synchronized (pendingRanges)
{
- if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty())
- {
- if (logger.isTraceEnabled())
- logger.trace("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName);
+ // create clone of current state
+ BiMultiValMap<Token, InetAddress> bootstrapTokensClone;
+ Set<InetAddress> leavingEndpointsClone;
+ Set<Pair<Token, InetAddress>> movingEndpointsClone;
+ TokenMetadata metadata;
- pendingRanges.put(keyspaceName, new PendingRangeMaps());
- }
- else
+ lock.readLock().lock();
+ try
{
- if (logger.isDebugEnabled())
- logger.debug("Starting pending range calculation for {}", keyspaceName);
-
- long startedAt = System.currentTimeMillis();
+ if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty())
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName);
- // create clone of current state
- BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<>();
- Set<InetAddress> leavingEndpoints = new HashSet<>();
- Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<>();
- TokenMetadata metadata;
+ pendingRanges.put(keyspaceName, new PendingRangeMaps());
- lock.readLock().lock();
- try
- {
- bootstrapTokens.putAll(this.bootstrapTokens);
- leavingEndpoints.addAll(this.leavingEndpoints);
- movingEndpoints.addAll(this.movingEndpoints);
- metadata = this.cloneOnlyTokenMap();
- }
- finally
- {
- lock.readLock().unlock();
+ return;
}
- pendingRanges.put(keyspaceName, calculatePendingRanges(strategy, metadata, bootstrapTokens,
- leavingEndpoints, movingEndpoints));
- long took = System.currentTimeMillis() - startedAt;
-
if (logger.isDebugEnabled())
- logger.debug("Pending range calculation for {} completed (took: {}ms)", keyspaceName, took);
- if (logger.isTraceEnabled())
- logger.trace("Calculated pending ranges for {}:\n{}", keyspaceName, (pendingRanges.isEmpty() ? "<empty>" : printPendingRanges()));
+ logger.debug("Starting pending range calculation for {}", keyspaceName);
+
+ bootstrapTokensClone = new BiMultiValMap<>(this.bootstrapTokens);
+ leavingEndpointsClone = new HashSet<>(this.leavingEndpoints);
+ movingEndpointsClone = new HashSet<>(this.movingEndpoints);
+ metadata = this.cloneOnlyTokenMap();
}
+ finally
+ {
+ lock.readLock().unlock();
+ }
+
+ pendingRanges.put(keyspaceName, calculatePendingRanges(strategy, metadata, bootstrapTokensClone,
+ leavingEndpointsClone, movingEndpointsClone));
+ long took = System.currentTimeMillis() - startedAt;
+
+ if (logger.isDebugEnabled())
+ logger.debug("Pending range calculation for {} completed (took: {}ms)", keyspaceName, took);
+ if (logger.isTraceEnabled())
+ logger.trace("Calculated pending ranges for {}:\n{}", keyspaceName, (pendingRanges.isEmpty() ? "<empty>" : printPendingRanges()));
}
}
@@ -960,7 +977,7 @@ public class TokenMetadata
{
List tokens = sortedTokens();
int index = Collections.binarySearch(tokens, token);
- assert index >= 0 : token + " not found in " + StringUtils.join(tokenToEndpointMap.keySet(), ", ");
+ assert index >= 0 : token + " not found in " + tokenToEndpointMapKeysAsStrings();
return (Token) (index == 0 ? tokens.get(tokens.size() - 1) : tokens.get(index - 1));
}
@@ -968,17 +985,30 @@ public class TokenMetadata
{
List tokens = sortedTokens();
int index = Collections.binarySearch(tokens, token);
- assert index >= 0 : token + " not found in " + StringUtils.join(tokenToEndpointMap.keySet(), ", ");
+ assert index >= 0 : token + " not found in " + tokenToEndpointMapKeysAsStrings();
return (Token) ((index == (tokens.size() - 1)) ? tokens.get(0) : tokens.get(index + 1));
}
+ private String tokenToEndpointMapKeysAsStrings()
+ {
+ lock.readLock().lock();
+ try
+ {
+ return StringUtils.join(tokenToEndpointMap.keySet(), ", ");
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+ }
+
/** @return a copy of the bootstrapping tokens map */
public BiMultiValMap<Token, InetAddress> getBootstrapTokens()
{
lock.readLock().lock();
try
{
- return new BiMultiValMap<Token, InetAddress>(bootstrapTokens);
+ return new BiMultiValMap<>(bootstrapTokens);
}
finally
{
@@ -1103,7 +1133,7 @@ public class TokenMetadata
pendingRanges.clear();
movingEndpoints.clear();
sortedTokens.clear();
- topology.clear();
+ topology = Topology.empty();
invalidateCachedRings();
}
finally
@@ -1271,130 +1301,170 @@ public class TokenMetadata
public static class Topology
{
/** multi-map of DC to endpoints in that DC */
- private final Multimap<String, InetAddress> dcEndpoints;
+ private final ImmutableMultimap<String, InetAddress> dcEndpoints;
/** map of DC to multi-map of rack to endpoints in that rack */
- private final Map<String, Multimap<String, InetAddress>> dcRacks;
+ private final ImmutableMap<String, ImmutableMultimap<String, InetAddress>> dcRacks;
/** reverse-lookup map for endpoint to current known dc/rack assignment */
- private final Map<InetAddress, Pair<String, String>> currentLocations;
+ private final ImmutableMap<InetAddress, Pair<String, String>> currentLocations;
- Topology()
+ private Topology(Builder builder)
{
- dcEndpoints = HashMultimap.create();
- dcRacks = new HashMap<>();
- currentLocations = new HashMap<>();
- }
+ this.dcEndpoints = ImmutableMultimap.copyOf(builder.dcEndpoints);
- void clear()
- {
- dcEndpoints.clear();
- dcRacks.clear();
- currentLocations.clear();
+ ImmutableMap.Builder<String, ImmutableMultimap<String, InetAddress>> dcRackBuilder = ImmutableMap.builder();
+ for (Map.Entry<String, Multimap<String, InetAddress>> entry : builder.dcRacks.entrySet())
+ dcRackBuilder.put(entry.getKey(), ImmutableMultimap.copyOf(entry.getValue()));
+ this.dcRacks = dcRackBuilder.build();
+
+ this.currentLocations = ImmutableMap.copyOf(builder.currentLocations);
}
/**
- * construct deep-copy of other
+ * @return multi-map of DC to endpoints in that DC
*/
- Topology(Topology other)
+ public Multimap<String, InetAddress> getDatacenterEndpoints()
{
- dcEndpoints = HashMultimap.create(other.dcEndpoints);
- dcRacks = new HashMap<>();
- for (String dc : other.dcRacks.keySet())
- dcRacks.put(dc, HashMultimap.create(other.dcRacks.get(dc)));
- currentLocations = new HashMap<>(other.currentLocations);
+ return dcEndpoints;
}
/**
- * Stores current DC/rack assignment for ep
+ * @return map of DC to multi-map of rack to endpoints in that rack
*/
- void addEndpoint(InetAddress ep)
+ public ImmutableMap<String, ImmutableMultimap<String, InetAddress>> getDatacenterRacks()
{
- IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
- String dc = snitch.getDatacenter(ep);
- String rack = snitch.getRack(ep);
- Pair<String, String> current = currentLocations.get(ep);
- if (current != null)
- {
- if (current.left.equals(dc) && current.right.equals(rack))
- return;
- doRemoveEndpoint(ep, current);
- }
-
- doAddEndpoint(ep, dc, rack);
+ return dcRacks;
}
- private void doAddEndpoint(InetAddress ep, String dc, String rack)
+ Builder unbuild()
{
- dcEndpoints.put(dc, ep);
-
- if (!dcRacks.containsKey(dc))
- dcRacks.put(dc, HashMultimap.<String, InetAddress>create());
- dcRacks.get(dc).put(rack, ep);
-
- currentLocations.put(ep, Pair.create(dc, rack));
+ return new Builder(this);
}
- /**
- * Removes current DC/rack assignment for ep
- */
- void removeEndpoint(InetAddress ep)
+ static Builder builder()
{
- if (!currentLocations.containsKey(ep))
- return;
-
- doRemoveEndpoint(ep, currentLocations.remove(ep));
+ return new Builder();
}
- private void doRemoveEndpoint(InetAddress ep, Pair<String, String> current)
+ static Topology empty()
{
- dcRacks.get(current.left).remove(current.right, ep);
- dcEndpoints.remove(current.left, ep);
+ return builder().build();
}
- void updateEndpoint(InetAddress ep)
+ private static class Builder
{
- IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
- if (snitch == null || !currentLocations.containsKey(ep))
- return;
+ /** multi-map of DC to endpoints in that DC */
+ private final Multimap<String, InetAddress> dcEndpoints;
+ /** map of DC to multi-map of rack to endpoints in that rack */
+ private final Map<String, Multimap<String, InetAddress>> dcRacks;
+ /** reverse-lookup map for endpoint to current known dc/rack assignment */
+ private final Map<InetAddress, Pair<String, String>> currentLocations;
- updateEndpoint(ep, snitch);
- }
+ Builder()
+ {
+ this.dcEndpoints = HashMultimap.create();
+ this.dcRacks = new HashMap<>();
+ this.currentLocations = new HashMap<>();
+ }
- void updateEndpoints()
- {
- IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
- if (snitch == null)
- return;
+ Builder(Topology from)
+ {
+ this.dcEndpoints = HashMultimap.create(from.dcEndpoints);
+
+ this.dcRacks = Maps.newHashMapWithExpectedSize(from.dcRacks.size());
+ for (Map.Entry<String, ImmutableMultimap<String, InetAddress>> entry : from.dcRacks.entrySet())
+ dcRacks.put(entry.getKey(), HashMultimap.create(entry.getValue()));
+
+ this.currentLocations = new HashMap<>(from.currentLocations);
+ }
+
+ /**
+ * Stores current DC/rack assignment for ep
+ */
+ Builder addEndpoint(InetAddress ep)
+ {
+ IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+ String dc = snitch.getDatacenter(ep);
+ String rack = snitch.getRack(ep);
+ Pair<String, String> current = currentLocations.get(ep);
+ if (current != null)
+ {
+ if (current.left.equals(dc) && current.right.equals(rack))
+ return this;
+ doRemoveEndpoint(ep, current);
+ }
+
+ doAddEndpoint(ep, dc, rack);
+ return this;
+ }
+
+ private void doAddEndpoint(InetAddress ep, String dc, String rack)
+ {
+ dcEndpoints.put(dc, ep);
+
+ if (!dcRacks.containsKey(dc))
+ dcRacks.put(dc, HashMultimap.<String, InetAddress>create());
+ dcRacks.get(dc).put(rack, ep);
+
+ currentLocations.put(ep, Pair.create(dc, rack));
+ }
+
+ /**
+ * Removes current DC/rack assignment for ep
+ */
+ Builder removeEndpoint(InetAddress ep)
+ {
+ if (!currentLocations.containsKey(ep))
+ return this;
+
+ doRemoveEndpoint(ep, currentLocations.remove(ep));
+ return this;
+ }
+
+ private void doRemoveEndpoint(InetAddress ep, Pair<String, String> current)
+ {
+ dcRacks.get(current.left).remove(current.right, ep);
+ dcEndpoints.remove(current.left, ep);
+ }
+
+ Builder updateEndpoint(InetAddress ep)
+ {
+ IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+ if (snitch == null || !currentLocations.containsKey(ep))
+ return this;
- for (InetAddress ep : currentLocations.keySet())
updateEndpoint(ep, snitch);
- }
+ return this;
+ }
- private void updateEndpoint(InetAddress ep, IEndpointSnitch snitch)
- {
- Pair<String, String> current = currentLocations.get(ep);
- String dc = snitch.getDatacenter(ep);
- String rack = snitch.getRack(ep);
- if (dc.equals(current.left) && rack.equals(current.right))
- return;
+ Builder updateEndpoints()
+ {
+ IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+ if (snitch == null)
+ return this;
- doRemoveEndpoint(ep, current);
- doAddEndpoint(ep, dc, rack);
- }
+ for (InetAddress ep : currentLocations.keySet())
+ updateEndpoint(ep, snitch);
- /**
- * @return multi-map of DC to endpoints in that DC
- */
- public Multimap<String, InetAddress> getDatacenterEndpoints()
- {
- return dcEndpoints;
- }
+ return this;
+ }
- /**
- * @return map of DC to multi-map of rack to endpoints in that rack
- */
- public Map<String, Multimap<String, InetAddress>> getDatacenterRacks()
- {
- return dcRacks;
+ private void updateEndpoint(InetAddress ep, IEndpointSnitch snitch)
+ {
+ Pair<String, String> current = currentLocations.get(ep);
+ String dc = snitch.getDatacenter(ep);
+ String rack = snitch.getRack(ep);
+ if (dc.equals(current.left) && rack.equals(current.right))
+ return;
+
+ doRemoveEndpoint(ep, current);
+ doAddEndpoint(ep, dc, rack);
+ }
+
+ Topology build()
+ {
+ return new Topology(this);
+ }
}
+
}
}
diff --git a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
index e7bb70a..dab7082 100644
--- a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
+++ b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
@@ -22,6 +22,7 @@ import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Map;
+import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Multimap;
@@ -29,18 +30,18 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
-import static junit.framework.Assert.assertNotNull;
-import static org.junit.Assert.assertEquals;
-
-import static org.apache.cassandra.Util.token;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.service.StorageService;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import static org.apache.cassandra.Util.token;
+
@RunWith(OrderedJUnit4ClassRunner.class)
public class TokenMetadataTest
@@ -139,7 +140,7 @@ public class TokenMetadataTest
assertTrue(allEndpoints.get(DATA_CENTER).contains(first));
assertTrue(allEndpoints.get(DATA_CENTER).contains(second));
- Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
+ Map<String, ImmutableMultimap<String, InetAddress>> racks = topology.getDatacenterRacks();
assertNotNull(racks);
assertTrue(racks.size() == 1);
assertTrue(racks.containsKey(DATA_CENTER));
@@ -171,7 +172,7 @@ public class TokenMetadataTest
});
tokenMetadata.updateTopology(first);
- tokenMetadata.updateTopology(second);
+ topology = tokenMetadata.updateTopology(second);
allEndpoints = topology.getDatacenterEndpoints();
assertNotNull(allEndpoints);
@@ -237,7 +238,7 @@ public class TokenMetadataTest
assertTrue(allEndpoints.get(DATA_CENTER).contains(first));
assertTrue(allEndpoints.get(DATA_CENTER).contains(second));
- Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
+ Map<String, ImmutableMultimap<String, InetAddress>> racks = topology.getDatacenterRacks();
assertNotNull(racks);
assertTrue(racks.size() == 1);
assertTrue(racks.containsKey(DATA_CENTER));
@@ -268,7 +269,7 @@ public class TokenMetadataTest
}
});
- tokenMetadata.updateTopology();
+ topology = tokenMetadata.updateTopology();
allEndpoints = topology.getDatacenterEndpoints();
assertNotNull(allEndpoints);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org