You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2019/02/01 11:29:36 UTC

[cassandra] branch cassandra-3.0 updated: Severe concurrency issues in STCS, DTCS, TWCS, TMD.Topology, TypeParser

This is an automated email from the ASF dual-hosted git repository.

snazy pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new bc18b4d  Severe concurrency issues in STCS,DTCS,TWCS,TMD.Topology,TypeParser
bc18b4d is described below

commit bc18b4dd4e33020d0d58c3701077d0af5c39bce6
Author: Robert Stupp <sn...@snazy.de>
AuthorDate: Wed Oct 31 12:48:19 2018 +0100

    Severe concurrency issues in STCS,DTCS,TWCS,TMD.Topology,TypeParser
    
    patch by Robert Stupp; reviewed by Blake Eggleston for CASSANDRA-14781
---
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   1 -
 .../db/compaction/AbstractCompactionStrategy.java  |  14 +-
 .../compaction/DateTieredCompactionStrategy.java   |  12 +-
 .../compaction/SizeTieredCompactionStrategy.java   |   8 +-
 .../compaction/TimeWindowCompactionStrategy.java   |   4 +-
 .../apache/cassandra/db/marshal/TypeParser.java    |  30 +-
 .../cassandra/locator/NetworkTopologyStrategy.java |   3 +-
 .../apache/cassandra/locator/TokenMetadata.java    | 354 ++++++++++++---------
 .../cassandra/locator/TokenMetadataTest.java       |  23 +-
 9 files changed, 269 insertions(+), 180 deletions(-)

diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 3482909..355d710 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2138,7 +2138,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 assert data.getCompacting().isEmpty() : data.getCompacting();
                 Iterable<SSTableReader> sstables = getLiveSSTables();
                 sstables = AbstractCompactionStrategy.filterSuspectSSTables(sstables);
-                sstables = ImmutableList.copyOf(sstables);
                 LifecycleTransaction modifier = data.tryModify(sstables, operationType);
                 assert modifier != null: "something marked things compacting while compactions are disabled";
                 return modifier;
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 9f07691..2348d19 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -252,15 +252,15 @@ public abstract class AbstractCompactionStrategy
      * @param originalCandidates The collection to check for blacklisted SSTables
      * @return list of the SSTables with blacklisted ones filtered out
      */
-    public static Iterable<SSTableReader> filterSuspectSSTables(Iterable<SSTableReader> originalCandidates)
+    public static List<SSTableReader> filterSuspectSSTables(Iterable<SSTableReader> originalCandidates)
     {
-        return Iterables.filter(originalCandidates, new Predicate<SSTableReader>()
+        List<SSTableReader> filtered = new ArrayList<>();
+        for (SSTableReader sstable : originalCandidates)
         {
-            public boolean apply(SSTableReader sstable)
-            {
-                return !sstable.isMarkedSuspect();
-            }
-        });
+            if (!sstable.isMarkedSuspect())
+                filtered.add(sstable);
+        }
+        return filtered;
     }
 
 
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 3e6ae61..7c38fa8 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -87,7 +87,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
      * @param gcBefore
      * @return
      */
-    private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
+    private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
     {
         if (Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE)))
             return Collections.emptyList();
@@ -193,11 +193,6 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
         });
     }
 
-    /**
-     *
-     * @param sstables
-     * @return
-     */
     public static List<Pair<SSTableReader, Long>> createSSTableAndMinTimestampPairs(Iterable<SSTableReader> sstables)
     {
         List<Pair<SSTableReader, Long>> sstableMinTimestampPairs = Lists.newArrayListWithCapacity(Iterables.size(sstables));
@@ -205,14 +200,15 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
             sstableMinTimestampPairs.add(Pair.create(sstable, sstable.getMinTimestamp()));
         return sstableMinTimestampPairs;
     }
+
     @Override
-    public void addSSTable(SSTableReader sstable)
+    public synchronized void addSSTable(SSTableReader sstable)
     {
         sstables.add(sstable);
     }
 
     @Override
-    public void removeSSTable(SSTableReader sstable)
+    public synchronized void removeSSTable(SSTableReader sstable)
     {
         sstables.remove(sstable);
     }
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index f8a8240..80f5e8c 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -74,7 +74,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
         this.sizeTieredOptions = new SizeTieredCompactionStrategyOptions(options);
     }
 
-    private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
+    private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
     {
         // make local copies so they can't be changed out from under us mid-method
         int minThreshold = cfs.getMinimumCompactionThreshold();
@@ -190,7 +190,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
     }
 
     @SuppressWarnings("resource")
-    public Collection<AbstractCompactionTask> getMaximalTask(final int gcBefore, boolean splitOutput)
+    public synchronized Collection<AbstractCompactionTask> getMaximalTask(final int gcBefore, boolean splitOutput)
     {
         Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables);
         if (Iterables.isEmpty(filteredSSTables))
@@ -316,13 +316,13 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
     }
 
     @Override
-    public void addSSTable(SSTableReader added)
+    public synchronized void addSSTable(SSTableReader added)
     {
         sstables.add(added);
     }
 
     @Override
-    public void removeSSTable(SSTableReader sstable)
+    public synchronized void removeSSTable(SSTableReader sstable)
     {
         sstables.remove(sstable);
     }
diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
index 8d26d0c..c44d3aa 100644
--- a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
@@ -169,13 +169,13 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
     }
 
     @Override
-    public void addSSTable(SSTableReader sstable)
+    public synchronized void addSSTable(SSTableReader sstable)
     {
         sstables.add(sstable);
     }
 
     @Override
-    public void removeSSTable(SSTableReader sstable)
+    public synchronized void removeSSTable(SSTableReader sstable)
     {
         sstables.remove(sstable);
     }
diff --git a/src/java/org/apache/cassandra/db/marshal/TypeParser.java b/src/java/org/apache/cassandra/db/marshal/TypeParser.java
index 35d15ab..590eea3 100644
--- a/src/java/org/apache/cassandra/db/marshal/TypeParser.java
+++ b/src/java/org/apache/cassandra/db/marshal/TypeParser.java
@@ -23,6 +23,9 @@ import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableMap;
+
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -37,7 +40,7 @@ public class TypeParser
     private int idx;
 
     // A cache of parsed string, specially useful for DynamicCompositeType
-    private static final Map<String, AbstractType<?>> cache = new HashMap<>();
+    private static volatile ImmutableMap<String, AbstractType<?>> cache = ImmutableMap.of();
 
     public static final TypeParser EMPTY_PARSER = new TypeParser("", 0);
 
@@ -60,6 +63,7 @@ public class TypeParser
         if (str == null)
             return BytesType.instance;
 
+        // A single volatile read of 'cache' should not hurt.
         AbstractType<?> type = cache.get(str);
 
         if (type != null)
@@ -83,9 +87,27 @@ public class TypeParser
         else
             type = getAbstractType(name);
 
-        // We don't really care about concurrency here. Worst case scenario, we do some parsing unnecessarily
-        cache.put(str, type);
-        return type;
+        Verify.verify(type != null, "Parsing %s yielded null, which is a bug", str);
+
+        // Prevent concurrent modification to the map acting as the cache for TypeParser at the expense of
+        // more allocation when the cache needs to be updated, since updates to the cache are rare compared
+        // to the amount of reads.
+        //
+        // Copy the existing cache into a new map and add the parsed AbstractType instance and replace
+        // the cache, if the type is not already in the cache.
+        //
+        // The cache-update is done in a short synchronized block to prevent duplicate instances of AbstractType
+        // for the same string representation.
+        synchronized (TypeParser.class)
+        {
+            if (!cache.containsKey(str))
+            {
+                ImmutableMap.Builder<String, AbstractType<?>> builder = ImmutableMap.builder();
+                builder.putAll(cache).put(str, type);
+                cache = builder.build();
+            }
+            return type;
+        }
     }
 
     public static AbstractType<?> parse(CharSequence compareWith) throws SyntaxException, ConfigurationException
diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
index 7c8d95e..82183bb 100644
--- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.TokenMetadata.Topology;
 import org.apache.cassandra.utils.FBUtilities;
 
+import com.google.common.collect.ImmutableMultimap;
 import com.google.common.collect.Multimap;
 
 /**
@@ -91,7 +92,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
         // all endpoints in each DC, so we can check when we have exhausted all the members of a DC
         Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints();
         // all racks in a DC so we can check when we have exhausted all racks in a DC
-        Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
+        Map<String, ImmutableMultimap<String, InetAddress>> racks = topology.getDatacenterRacks();
         assert !allEndpoints.isEmpty() && !racks.isEmpty() : "not aware of any cluster members";
 
         // tracks the racks we have already placed replicas in
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index b44a1a1..3978eeb 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -95,9 +95,10 @@ public class TokenMetadata
 
     /* Use this lock for manipulating the token map */
     private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
-    private volatile ArrayList<Token> sortedTokens;
+    private volatile ArrayList<Token> sortedTokens; // safe to be read without a lock, as it's never mutated
+
+    private volatile Topology topology;
 
-    private final Topology topology;
     public final IPartitioner partitioner;
 
     private static final Comparator<InetAddress> inetaddressCmp = new Comparator<InetAddress>()
@@ -115,7 +116,7 @@ public class TokenMetadata
     {
         this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp),
              HashBiMap.<InetAddress, UUID>create(),
-             new Topology(),
+             Topology.empty(),
              DatabaseDescriptor.getPartitioner());
     }
 
@@ -193,6 +194,7 @@ public class TokenMetadata
         try
         {
             boolean shouldSortTokens = false;
+            Topology.Builder topologyBuilder = topology.unbuild();
             for (InetAddress endpoint : endpointTokens.keySet())
             {
                 Collection<Token> tokens = endpointTokens.get(endpoint);
@@ -201,7 +203,7 @@ public class TokenMetadata
 
                 bootstrapTokens.removeValue(endpoint);
                 tokenToEndpointMap.removeValue(endpoint);
-                topology.addEndpoint(endpoint);
+                topologyBuilder.addEndpoint(endpoint);
                 leavingEndpoints.remove(endpoint);
                 replacementToOriginal.remove(endpoint);
                 removeFromMoving(endpoint); // also removing this endpoint from moving
@@ -217,6 +219,7 @@ public class TokenMetadata
                     }
                 }
             }
+            topology = topologyBuilder.build();
 
             if (shouldSortTokens)
                 sortedTokens = sortTokens();
@@ -381,12 +384,28 @@ public class TokenMetadata
 
     public Optional<InetAddress> getReplacementNode(InetAddress endpoint)
     {
-        return Optional.ofNullable(replacementToOriginal.inverse().get(endpoint));
+        lock.readLock().lock();
+        try
+        {
+            return Optional.ofNullable(replacementToOriginal.inverse().get(endpoint));
+        }
+        finally
+        {
+            lock.readLock().unlock();
+        }
     }
 
     public Optional<InetAddress> getReplacingNode(InetAddress endpoint)
     {
-        return Optional.ofNullable((replacementToOriginal.get(endpoint)));
+        lock.readLock().lock();
+        try
+        {
+            return Optional.ofNullable((replacementToOriginal.get(endpoint)));
+        }
+        finally
+        {
+            lock.readLock().unlock();
+        }
     }
 
     public void removeBootstrapTokens(Collection<Token> tokens)
@@ -430,7 +449,6 @@ public class TokenMetadata
         assert endpoint != null;
 
         lock.writeLock().lock();
-
         try
         {
             movingEndpoints.add(Pair.create(token, endpoint));
@@ -450,7 +468,7 @@ public class TokenMetadata
         {
             bootstrapTokens.removeValue(endpoint);
             tokenToEndpointMap.removeValue(endpoint);
-            topology.removeEndpoint(endpoint);
+            topology = topology.unbuild().removeEndpoint(endpoint).build();
             leavingEndpoints.remove(endpoint);
             if (replacementToOriginal.remove(endpoint) != null)
             {
@@ -469,7 +487,7 @@ public class TokenMetadata
     /**
      * This is called when the snitch properties for this endpoint are updated, see CASSANDRA-10238.
      */
-    public void updateTopology(InetAddress endpoint)
+    public Topology updateTopology(InetAddress endpoint)
     {
         assert endpoint != null;
 
@@ -477,8 +495,9 @@ public class TokenMetadata
         try
         {
             logger.info("Updating topology for {}", endpoint);
-            topology.updateEndpoint(endpoint);
+            topology = topology.unbuild().updateEndpoint(endpoint).build();
             invalidateCachedRings();
+            return topology;
         }
         finally
         {
@@ -490,14 +509,15 @@ public class TokenMetadata
      * This is called when the snitch properties for many endpoints are updated, it will update
      * the topology mappings of any endpoints whose snitch has changed, see CASSANDRA-10238.
      */
-    public void updateTopology()
+    public Topology updateTopology()
     {
         lock.writeLock().lock();
         try
         {
             logger.info("Updating topology for all endpoints that have changed");
-            topology.updateEndpoints();
+            topology = topology.unbuild().updateEndpoints().build();
             invalidateCachedRings();
+            return topology;
         }
         finally
         {
@@ -590,7 +610,6 @@ public class TokenMetadata
         assert endpoint != null;
 
         lock.readLock().lock();
-
         try
         {
             for (Pair<Token, InetAddress> pair : movingEndpoints)
@@ -620,7 +639,7 @@ public class TokenMetadata
         {
             return new TokenMetadata(SortedBiMultiValMap.create(tokenToEndpointMap, null, inetaddressCmp),
                                      HashBiMap.create(endpointToHostIdMap),
-                                     new Topology(topology),
+                                     topology,
                                      partitioner);
         }
         finally
@@ -690,7 +709,6 @@ public class TokenMetadata
     public TokenMetadata cloneAfterAllSettled()
     {
         lock.readLock().lock();
-
         try
         {
             TokenMetadata metadata = cloneOnlyTokenMap();
@@ -807,50 +825,49 @@ public class TokenMetadata
     public void calculatePendingRanges(AbstractReplicationStrategy strategy, String keyspaceName)
     {
         // avoid race between both branches - do not use a lock here as this will block any other unrelated operations!
+        long startedAt = System.currentTimeMillis();
         synchronized (pendingRanges)
         {
-            if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty())
-            {
-                if (logger.isTraceEnabled())
-                    logger.trace("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName);
+            // create clone of current state
+            BiMultiValMap<Token, InetAddress> bootstrapTokensClone;
+            Set<InetAddress> leavingEndpointsClone;
+            Set<Pair<Token, InetAddress>> movingEndpointsClone;
+            TokenMetadata metadata;
 
-                pendingRanges.put(keyspaceName, new PendingRangeMaps());
-            }
-            else
+            lock.readLock().lock();
+            try
             {
-                if (logger.isDebugEnabled())
-                    logger.debug("Starting pending range calculation for {}", keyspaceName);
-
-                long startedAt = System.currentTimeMillis();
+                if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty())
+                {
+                    if (logger.isTraceEnabled())
+                        logger.trace("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName);
 
-                // create clone of current state
-                BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<>();
-                Set<InetAddress> leavingEndpoints = new HashSet<>();
-                Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<>();
-                TokenMetadata metadata;
+                    pendingRanges.put(keyspaceName, new PendingRangeMaps());
 
-                lock.readLock().lock();
-                try
-                {
-                    bootstrapTokens.putAll(this.bootstrapTokens);
-                    leavingEndpoints.addAll(this.leavingEndpoints);
-                    movingEndpoints.addAll(this.movingEndpoints);
-                    metadata = this.cloneOnlyTokenMap();
-                }
-                finally
-                {
-                    lock.readLock().unlock();
+                    return;
                 }
 
-                pendingRanges.put(keyspaceName, calculatePendingRanges(strategy, metadata, bootstrapTokens,
-                                                                       leavingEndpoints, movingEndpoints));
-                long took = System.currentTimeMillis() - startedAt;
-
                 if (logger.isDebugEnabled())
-                    logger.debug("Pending range calculation for {} completed (took: {}ms)", keyspaceName, took);
-                if (logger.isTraceEnabled())
-                    logger.trace("Calculated pending ranges for {}:\n{}", keyspaceName, (pendingRanges.isEmpty() ? "<empty>" : printPendingRanges()));
+                    logger.debug("Starting pending range calculation for {}", keyspaceName);
+
+                bootstrapTokensClone  = new BiMultiValMap<>(this.bootstrapTokens);
+                leavingEndpointsClone = new HashSet<>(this.leavingEndpoints);
+                movingEndpointsClone = new HashSet<>(this.movingEndpoints);
+                metadata = this.cloneOnlyTokenMap();
             }
+            finally
+            {
+                lock.readLock().unlock();
+            }
+
+            pendingRanges.put(keyspaceName, calculatePendingRanges(strategy, metadata, bootstrapTokensClone,
+                                                                   leavingEndpointsClone, movingEndpointsClone));
+            long took = System.currentTimeMillis() - startedAt;
+
+            if (logger.isDebugEnabled())
+                logger.debug("Pending range calculation for {} completed (took: {}ms)", keyspaceName, took);
+            if (logger.isTraceEnabled())
+                logger.trace("Calculated pending ranges for {}:\n{}", keyspaceName, (pendingRanges.isEmpty() ? "<empty>" : printPendingRanges()));
         }
     }
 
@@ -960,7 +977,7 @@ public class TokenMetadata
     {
         List tokens = sortedTokens();
         int index = Collections.binarySearch(tokens, token);
-        assert index >= 0 : token + " not found in " + StringUtils.join(tokenToEndpointMap.keySet(), ", ");
+        assert index >= 0 : token + " not found in " + tokenToEndpointMapKeysAsStrings();
         return (Token) (index == 0 ? tokens.get(tokens.size() - 1) : tokens.get(index - 1));
     }
 
@@ -968,17 +985,30 @@ public class TokenMetadata
     {
         List tokens = sortedTokens();
         int index = Collections.binarySearch(tokens, token);
-        assert index >= 0 : token + " not found in " + StringUtils.join(tokenToEndpointMap.keySet(), ", ");
+        assert index >= 0 : token + " not found in " + tokenToEndpointMapKeysAsStrings();
         return (Token) ((index == (tokens.size() - 1)) ? tokens.get(0) : tokens.get(index + 1));
     }
 
+    private String tokenToEndpointMapKeysAsStrings()
+    {
+        lock.readLock().lock();
+        try
+        {
+            return StringUtils.join(tokenToEndpointMap.keySet(), ", ");
+        }
+        finally
+        {
+            lock.readLock().unlock();
+        }
+    }
+
     /** @return a copy of the bootstrapping tokens map */
     public BiMultiValMap<Token, InetAddress> getBootstrapTokens()
     {
         lock.readLock().lock();
         try
         {
-            return new BiMultiValMap<Token, InetAddress>(bootstrapTokens);
+            return new BiMultiValMap<>(bootstrapTokens);
         }
         finally
         {
@@ -1103,7 +1133,7 @@ public class TokenMetadata
             pendingRanges.clear();
             movingEndpoints.clear();
             sortedTokens.clear();
-            topology.clear();
+            topology = Topology.empty();
             invalidateCachedRings();
         }
         finally
@@ -1271,130 +1301,170 @@ public class TokenMetadata
     public static class Topology
     {
         /** multi-map of DC to endpoints in that DC */
-        private final Multimap<String, InetAddress> dcEndpoints;
+        private final ImmutableMultimap<String, InetAddress> dcEndpoints;
         /** map of DC to multi-map of rack to endpoints in that rack */
-        private final Map<String, Multimap<String, InetAddress>> dcRacks;
+        private final ImmutableMap<String, ImmutableMultimap<String, InetAddress>> dcRacks;
         /** reverse-lookup map for endpoint to current known dc/rack assignment */
-        private final Map<InetAddress, Pair<String, String>> currentLocations;
+        private final ImmutableMap<InetAddress, Pair<String, String>> currentLocations;
 
-        Topology()
+        private Topology(Builder builder)
         {
-            dcEndpoints = HashMultimap.create();
-            dcRacks = new HashMap<>();
-            currentLocations = new HashMap<>();
-        }
+            this.dcEndpoints = ImmutableMultimap.copyOf(builder.dcEndpoints);
 
-        void clear()
-        {
-            dcEndpoints.clear();
-            dcRacks.clear();
-            currentLocations.clear();
+            ImmutableMap.Builder<String, ImmutableMultimap<String, InetAddress>> dcRackBuilder = ImmutableMap.builder();
+            for (Map.Entry<String, Multimap<String, InetAddress>> entry : builder.dcRacks.entrySet())
+                dcRackBuilder.put(entry.getKey(), ImmutableMultimap.copyOf(entry.getValue()));
+            this.dcRacks = dcRackBuilder.build();
+
+            this.currentLocations = ImmutableMap.copyOf(builder.currentLocations);
         }
 
         /**
-         * construct deep-copy of other
+         * @return multi-map of DC to endpoints in that DC
          */
-        Topology(Topology other)
+        public Multimap<String, InetAddress> getDatacenterEndpoints()
         {
-            dcEndpoints = HashMultimap.create(other.dcEndpoints);
-            dcRacks = new HashMap<>();
-            for (String dc : other.dcRacks.keySet())
-                dcRacks.put(dc, HashMultimap.create(other.dcRacks.get(dc)));
-            currentLocations = new HashMap<>(other.currentLocations);
+            return dcEndpoints;
         }
 
         /**
-         * Stores current DC/rack assignment for ep
+         * @return map of DC to multi-map of rack to endpoints in that rack
          */
-        void addEndpoint(InetAddress ep)
+        public ImmutableMap<String, ImmutableMultimap<String, InetAddress>> getDatacenterRacks()
         {
-            IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-            String dc = snitch.getDatacenter(ep);
-            String rack = snitch.getRack(ep);
-            Pair<String, String> current = currentLocations.get(ep);
-            if (current != null)
-            {
-                if (current.left.equals(dc) && current.right.equals(rack))
-                    return;
-                doRemoveEndpoint(ep, current);
-            }
-
-            doAddEndpoint(ep, dc, rack);
+            return dcRacks;
         }
 
-        private void doAddEndpoint(InetAddress ep, String dc, String rack)
+        Builder unbuild()
         {
-            dcEndpoints.put(dc, ep);
-
-            if (!dcRacks.containsKey(dc))
-                dcRacks.put(dc, HashMultimap.<String, InetAddress>create());
-            dcRacks.get(dc).put(rack, ep);
-
-            currentLocations.put(ep, Pair.create(dc, rack));
+            return new Builder(this);
         }
 
-        /**
-         * Removes current DC/rack assignment for ep
-         */
-        void removeEndpoint(InetAddress ep)
+        static Builder builder()
         {
-            if (!currentLocations.containsKey(ep))
-                return;
-
-            doRemoveEndpoint(ep, currentLocations.remove(ep));
+            return new Builder();
         }
 
-        private void doRemoveEndpoint(InetAddress ep, Pair<String, String> current)
+        static Topology empty()
         {
-            dcRacks.get(current.left).remove(current.right, ep);
-            dcEndpoints.remove(current.left, ep);
+            return builder().build();
         }
 
-        void updateEndpoint(InetAddress ep)
+        private static class Builder
         {
-            IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-            if (snitch == null || !currentLocations.containsKey(ep))
-                return;
+            /** multi-map of DC to endpoints in that DC */
+            private final Multimap<String, InetAddress> dcEndpoints;
+            /** map of DC to multi-map of rack to endpoints in that rack */
+            private final Map<String, Multimap<String, InetAddress>> dcRacks;
+            /** reverse-lookup map for endpoint to current known dc/rack assignment */
+            private final Map<InetAddress, Pair<String, String>> currentLocations;
 
-           updateEndpoint(ep, snitch);
-        }
+            Builder()
+            {
+                this.dcEndpoints = HashMultimap.create();
+                this.dcRacks = new HashMap<>();
+                this.currentLocations = new HashMap<>();
+            }
 
-        void updateEndpoints()
-        {
-            IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-            if (snitch == null)
-                return;
+            Builder(Topology from)
+            {
+                this.dcEndpoints = HashMultimap.create(from.dcEndpoints);
+
+                this.dcRacks = Maps.newHashMapWithExpectedSize(from.dcRacks.size());
+                for (Map.Entry<String, ImmutableMultimap<String, InetAddress>> entry : from.dcRacks.entrySet())
+                    dcRacks.put(entry.getKey(), HashMultimap.create(entry.getValue()));
+
+                this.currentLocations = new HashMap<>(from.currentLocations);
+            }
+
+            /**
+             * Stores current DC/rack assignment for ep
+             */
+            Builder addEndpoint(InetAddress ep)
+            {
+                IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+                String dc = snitch.getDatacenter(ep);
+                String rack = snitch.getRack(ep);
+                Pair<String, String> current = currentLocations.get(ep);
+                if (current != null)
+                {
+                    if (current.left.equals(dc) && current.right.equals(rack))
+                        return this;
+                    doRemoveEndpoint(ep, current);
+                }
+
+                doAddEndpoint(ep, dc, rack);
+                return this;
+            }
+
+            private void doAddEndpoint(InetAddress ep, String dc, String rack)
+            {
+                dcEndpoints.put(dc, ep);
+
+                if (!dcRacks.containsKey(dc))
+                    dcRacks.put(dc, HashMultimap.<String, InetAddress>create());
+                dcRacks.get(dc).put(rack, ep);
+
+                currentLocations.put(ep, Pair.create(dc, rack));
+            }
+
+            /**
+             * Removes current DC/rack assignment for ep
+             */
+            Builder removeEndpoint(InetAddress ep)
+            {
+                if (!currentLocations.containsKey(ep))
+                    return this;
+
+                doRemoveEndpoint(ep, currentLocations.remove(ep));
+                return this;
+            }
+
+            private void doRemoveEndpoint(InetAddress ep, Pair<String, String> current)
+            {
+                dcRacks.get(current.left).remove(current.right, ep);
+                dcEndpoints.remove(current.left, ep);
+            }
+
+            Builder updateEndpoint(InetAddress ep)
+            {
+                IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+                if (snitch == null || !currentLocations.containsKey(ep))
+                    return this;
 
-            for (InetAddress ep : currentLocations.keySet())
                 updateEndpoint(ep, snitch);
-        }
+                return this;
+            }
 
-        private void updateEndpoint(InetAddress ep, IEndpointSnitch snitch)
-        {
-            Pair<String, String> current = currentLocations.get(ep);
-            String dc = snitch.getDatacenter(ep);
-            String rack = snitch.getRack(ep);
-            if (dc.equals(current.left) && rack.equals(current.right))
-                return;
+            Builder updateEndpoints()
+            {
+                IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+                if (snitch == null)
+                    return this;
 
-            doRemoveEndpoint(ep, current);
-            doAddEndpoint(ep, dc, rack);
-        }
+                for (InetAddress ep : currentLocations.keySet())
+                    updateEndpoint(ep, snitch);
 
-        /**
-         * @return multi-map of DC to endpoints in that DC
-         */
-        public Multimap<String, InetAddress> getDatacenterEndpoints()
-        {
-            return dcEndpoints;
-        }
+                return this;
+            }
 
-        /**
-         * @return map of DC to multi-map of rack to endpoints in that rack
-         */
-        public Map<String, Multimap<String, InetAddress>> getDatacenterRacks()
-        {
-            return dcRacks;
+            private void updateEndpoint(InetAddress ep, IEndpointSnitch snitch)
+            {
+                Pair<String, String> current = currentLocations.get(ep);
+                String dc = snitch.getDatacenter(ep);
+                String rack = snitch.getRack(ep);
+                if (dc.equals(current.left) && rack.equals(current.right))
+                    return;
+
+                doRemoveEndpoint(ep, current);
+                doAddEndpoint(ep, dc, rack);
+            }
+
+            Topology build()
+            {
+                return new Topology(this);
+            }
         }
+
     }
 }
diff --git a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
index e7bb70a..dab7082 100644
--- a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
+++ b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
@@ -22,6 +22,7 @@ import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Map;
 
+import com.google.common.collect.ImmutableMultimap;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Multimap;
 
@@ -29,18 +30,18 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
-import static junit.framework.Assert.assertNotNull;
-import static org.junit.Assert.assertEquals;
-
-import static org.apache.cassandra.Util.token;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.service.StorageService;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import static org.apache.cassandra.Util.token;
+
 
 @RunWith(OrderedJUnit4ClassRunner.class)
 public class TokenMetadataTest
@@ -139,7 +140,7 @@ public class TokenMetadataTest
         assertTrue(allEndpoints.get(DATA_CENTER).contains(first));
         assertTrue(allEndpoints.get(DATA_CENTER).contains(second));
 
-        Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
+        Map<String, ImmutableMultimap<String, InetAddress>> racks = topology.getDatacenterRacks();
         assertNotNull(racks);
         assertTrue(racks.size() == 1);
         assertTrue(racks.containsKey(DATA_CENTER));
@@ -171,7 +172,7 @@ public class TokenMetadataTest
         });
 
         tokenMetadata.updateTopology(first);
-        tokenMetadata.updateTopology(second);
+        topology = tokenMetadata.updateTopology(second);
 
         allEndpoints = topology.getDatacenterEndpoints();
         assertNotNull(allEndpoints);
@@ -237,7 +238,7 @@ public class TokenMetadataTest
         assertTrue(allEndpoints.get(DATA_CENTER).contains(first));
         assertTrue(allEndpoints.get(DATA_CENTER).contains(second));
 
-        Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
+        Map<String, ImmutableMultimap<String, InetAddress>> racks = topology.getDatacenterRacks();
         assertNotNull(racks);
         assertTrue(racks.size() == 1);
         assertTrue(racks.containsKey(DATA_CENTER));
@@ -268,7 +269,7 @@ public class TokenMetadataTest
             }
         });
 
-        tokenMetadata.updateTopology();
+        topology = tokenMetadata.updateTopology();
 
         allEndpoints = topology.getDatacenterEndpoints();
         assertNotNull(allEndpoints);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org