You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/09/16 12:49:07 UTC

[1/3] cassandra git commit: Fix consolidating racks violating the RF contract

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 e7c2952d3 -> 5b05b6826


Fix consolidating racks violating the RF contract

patch by Stefania Alborghetti; reviewed by Blake Eggleston for
CASSANDRA-10238


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/257cdaa0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/257cdaa0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/257cdaa0

Branch: refs/heads/cassandra-2.2
Commit: 257cdaa08dc12f747a25c03b9b0ad3ffc76ace9b
Parents: 0bb32f0
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Fri Sep 11 16:31:40 2015 +0800
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Sep 16 11:40:31 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/locator/PropertyFileSnitch.java   |   1 +
 .../apache/cassandra/locator/TokenMetadata.java | 142 +++++++++----
 .../locator/YamlFileNetworkTopologySnitch.java  |   1 +
 .../cassandra/service/StorageService.java       |  16 ++
 .../cassandra/locator/TokenMetadataTest.java    | 209 ++++++++++++++++++-
 6 files changed, 332 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/257cdaa0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d4cc15f..3c47427 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.17
+ * Fix consolidating racks violating the RF contract (CASSANDRA-10238)
  * Disallow decommission when node is in drained state (CASSANDRA-8741)
  * Backport CASSANDRA-8013 to 2.0 (CASSANDRA-10144)
  * Make getFullyExpiredSSTables less expensive (CASSANDRA-9882)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/257cdaa0/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java b/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
index 4f822c6..745eeb8 100644
--- a/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
+++ b/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
@@ -69,6 +69,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
                 protected void runMayThrow() throws ConfigurationException
                 {
                     reloadConfiguration();
+                    StorageService.instance.updateTopology();
                 }
             };
             ResourceWatcher.watch(SNITCH_PROPERTIES_FILENAME, runnable, 60 * 1000);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/257cdaa0/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index a673c94..b1b25e8 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -78,14 +78,14 @@ public class TokenMetadata
     // Finally, note that recording the tokens of joining nodes in bootstrapTokens also
     // means we can detect and reject the addition of multiple nodes at the same token
     // before one becomes part of the ring.
-    private final BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<Token, InetAddress>();
+    private final BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<>();
     // (don't need to record Token here since it's still part of tokenToEndpointMap until it's done leaving)
-    private final Set<InetAddress> leavingEndpoints = new HashSet<InetAddress>();
+    private final Set<InetAddress> leavingEndpoints = new HashSet<>();
     // this is a cache of the calculation from {tokenToEndpointMap, bootstrapTokens, leavingEndpoints}
-    private final ConcurrentMap<String, Multimap<Range<Token>, InetAddress>> pendingRanges = new ConcurrentHashMap<String, Multimap<Range<Token>, InetAddress>>();
+    private final ConcurrentMap<String, Multimap<Range<Token>, InetAddress>> pendingRanges = new ConcurrentHashMap<>();
 
     // nodes which are migrating to the new tokens in the ring
-    private final Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<Pair<Token, InetAddress>>();
+    private final Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<>();
 
     /* Use this lock for manipulating the token map */
     private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
@@ -121,7 +121,7 @@ public class TokenMetadata
 
     private ArrayList<Token> sortTokens()
     {
-        return new ArrayList<Token>(tokenToEndpointMap.keySet());
+        return new ArrayList<>(tokenToEndpointMap.keySet());
     }
 
     /** @return the number of nodes bootstrapping into source's primary range */
@@ -165,8 +165,6 @@ public class TokenMetadata
      *
      * Prefer this whenever there are multiple pairs to update, as each update (whether a single or multiple)
      * is expensive (CASSANDRA-3831).
-     *
-     * @param endpointTokens
      */
     public void updateNormalTokens(Multimap<InetAddress, Token> endpointTokens)
     {
@@ -213,9 +211,6 @@ public class TokenMetadata
     /**
      * Store an end-point to host ID mapping.  Each ID must be unique, and
      * cannot be changed after the fact.
-     *
-     * @param hostId
-     * @param endpoint
      */
     public void updateHostId(UUID hostId, InetAddress endpoint)
     {
@@ -284,7 +279,7 @@ public class TokenMetadata
         lock.readLock().lock();
         try
         {
-            Map<InetAddress, UUID> readMap = new HashMap<InetAddress, UUID>();
+            Map<InetAddress, UUID> readMap = new HashMap<>();
             readMap.putAll(endpointToHostIdMap);
             return readMap;
         }
@@ -407,6 +402,43 @@ public class TokenMetadata
     }
 
     /**
+     * This is called when the snitch properties for this endpoint are updated, see CASSANDRA-10238.
+     */
+    public void updateTopology(InetAddress endpoint)
+    {
+        assert endpoint != null;
+
+        lock.writeLock().lock();
+        try
+        {
+            logger.info("Updating topology for {}", endpoint);
+            topology.updateEndpoint(endpoint);
+        }
+        finally
+        {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * This is called when the snitch properties for many endpoints are updated, it will update
+     * the topology mappings of any endpoints whose snitch has changed, see CASSANDRA-10238.
+     */
+    public void updateTopology()
+    {
+        lock.writeLock().lock();
+        try
+        {
+            logger.info("Updating topology for all endpoints that have changed");
+            topology.updateEndpoints();
+        }
+        finally
+        {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
      * Remove pair of token/address from moving endpoints
      * @param endpoint address of the moving node
      */
@@ -442,7 +474,7 @@ public class TokenMetadata
         lock.readLock().lock();
         try
         {
-            return new ArrayList<Token>(tokenToEndpointMap.inverse().get(endpoint));
+            return new ArrayList<>(tokenToEndpointMap.inverse().get(endpoint));
         }
         finally
         {
@@ -508,7 +540,7 @@ public class TokenMetadata
         }
     }
 
-    private final AtomicReference<TokenMetadata> cachedTokenMap = new AtomicReference<TokenMetadata>();
+    private final AtomicReference<TokenMetadata> cachedTokenMap = new AtomicReference<>();
 
     /**
      * Create a copy of TokenMetadata with only tokenToEndpointMap. That is, pending ranges,
@@ -519,7 +551,7 @@ public class TokenMetadata
         lock.readLock().lock();
         try
         {
-            return new TokenMetadata(SortedBiMultiValMap.<Token, InetAddress>create(tokenToEndpointMap, null, inetaddressCmp),
+            return new TokenMetadata(SortedBiMultiValMap.create(tokenToEndpointMap, null, inetaddressCmp),
                                      HashBiMap.create(endpointToHostIdMap),
                                      new Topology(topology));
         }
@@ -622,9 +654,9 @@ public class TokenMetadata
 
     public Collection<Range<Token>> getPrimaryRangesFor(Collection<Token> tokens)
     {
-        Collection<Range<Token>> ranges = new ArrayList<Range<Token>>(tokens.size());
+        Collection<Range<Token>> ranges = new ArrayList<>(tokens.size());
         for (Token right : tokens)
-            ranges.add(new Range<Token>(getPredecessor(right), right));
+            ranges.add(new Range<>(getPredecessor(right), right));
         return ranges;
     }
 
@@ -660,7 +692,7 @@ public class TokenMetadata
 
     public List<Range<Token>> getPendingRanges(String keyspaceName, InetAddress endpoint)
     {
-        List<Range<Token>> ranges = new ArrayList<Range<Token>>();
+        List<Range<Token>> ranges = new ArrayList<>();
         for (Map.Entry<Range<Token>, InetAddress> entry : getPendingRangesMM(keyspaceName).entries())
         {
             if (entry.getValue().equals(endpoint))
@@ -845,7 +877,7 @@ public class TokenMetadata
                 for (InetAddress ep : eps)
                 {
                     sb.append(ep);
-                    sb.append(":");
+                    sb.append(':');
                     sb.append(tokenToEndpointMap.inverse().get(ep));
                     sb.append(System.getProperty("line.separator"));
                 }
@@ -857,7 +889,7 @@ public class TokenMetadata
                 sb.append(System.getProperty("line.separator"));
                 for (Map.Entry<Token, InetAddress> entry : bootstrapTokens.entrySet())
                 {
-                    sb.append(entry.getValue()).append(":").append(entry.getKey());
+                    sb.append(entry.getValue()).append(':').append(entry.getKey());
                     sb.append(System.getProperty("line.separator"));
                 }
             }
@@ -896,7 +928,7 @@ public class TokenMetadata
         {
             for (Map.Entry<Range<Token>, InetAddress> rmap : entry.getValue().entries())
             {
-                sb.append(rmap.getValue()).append(":").append(rmap.getKey());
+                sb.append(rmap.getValue()).append(':').append(rmap.getKey());
                 sb.append(System.getProperty("line.separator"));
             }
         }
@@ -910,7 +942,7 @@ public class TokenMetadata
         if (ranges.isEmpty())
             return Collections.emptyList();
 
-        Set<InetAddress> endpoints = new HashSet<InetAddress>();
+        Set<InetAddress> endpoints = new HashSet<>();
         for (Map.Entry<Range<Token>, Collection<InetAddress>> entry : ranges.entrySet())
         {
             if (entry.getKey().contains(token))
@@ -954,7 +986,7 @@ public class TokenMetadata
         lock.readLock().lock();
         try
         {
-            Map<Token, InetAddress> map = new HashMap<Token, InetAddress>(tokenToEndpointMap.size() + bootstrapTokens.size());
+            Map<Token, InetAddress> map = new HashMap<>(tokenToEndpointMap.size() + bootstrapTokens.size());
             map.putAll(tokenToEndpointMap);
             map.putAll(bootstrapTokens);
             return map;
@@ -1001,14 +1033,14 @@ public class TokenMetadata
         /** reverse-lookup map for endpoint to current known dc/rack assignment */
         private final Map<InetAddress, Pair<String, String>> currentLocations;
 
-        protected Topology()
+        Topology()
         {
             dcEndpoints = HashMultimap.create();
-            dcRacks = new HashMap<String, Multimap<String, InetAddress>>();
-            currentLocations = new HashMap<InetAddress, Pair<String, String>>();
+            dcRacks = new HashMap<>();
+            currentLocations = new HashMap<>();
         }
 
-        protected void clear()
+        void clear()
         {
             dcEndpoints.clear();
             dcRacks.clear();
@@ -1018,19 +1050,19 @@ public class TokenMetadata
         /**
          * construct deep-copy of other
          */
-        protected Topology(Topology other)
+        Topology(Topology other)
         {
             dcEndpoints = HashMultimap.create(other.dcEndpoints);
-            dcRacks = new HashMap<String, Multimap<String, InetAddress>>();
+            dcRacks = new HashMap<>();
             for (String dc : other.dcRacks.keySet())
                 dcRacks.put(dc, HashMultimap.create(other.dcRacks.get(dc)));
-            currentLocations = new HashMap<InetAddress, Pair<String, String>>(other.currentLocations);
+            currentLocations = new HashMap<>(other.currentLocations);
         }
 
         /**
          * Stores current DC/rack assignment for ep
          */
-        protected void addEndpoint(InetAddress ep)
+        void addEndpoint(InetAddress ep)
         {
             IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
             String dc = snitch.getDatacenter(ep);
@@ -1040,10 +1072,14 @@ public class TokenMetadata
             {
                 if (current.left.equals(dc) && current.right.equals(rack))
                     return;
-                dcRacks.get(current.left).remove(current.right, ep);
-                dcEndpoints.remove(current.left, ep);
+                doRemoveEndpoint(ep, current);
             }
 
+            doAddEndpoint(ep, dc, rack);
+        }
+
+        private void doAddEndpoint(InetAddress ep, String dc, String rack)
+        {
             dcEndpoints.put(dc, ep);
 
             if (!dcRacks.containsKey(dc))
@@ -1056,13 +1092,49 @@ public class TokenMetadata
         /**
          * Removes current DC/rack assignment for ep
          */
-        protected void removeEndpoint(InetAddress ep)
+        void removeEndpoint(InetAddress ep)
         {
             if (!currentLocations.containsKey(ep))
                 return;
-            Pair<String, String> current = currentLocations.remove(ep);
-            dcEndpoints.remove(current.left, ep);
+
+            doRemoveEndpoint(ep, currentLocations.remove(ep));
+        }
+
+        private void doRemoveEndpoint(InetAddress ep, Pair<String, String> current)
+        {
             dcRacks.get(current.left).remove(current.right, ep);
+            dcEndpoints.remove(current.left, ep);
+        }
+
+        void updateEndpoint(InetAddress ep)
+        {
+            IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+            if (snitch == null || !currentLocations.containsKey(ep))
+                return;
+
+           updateEndpoint(ep, snitch);
+        }
+
+        void updateEndpoints()
+        {
+            IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+            if (snitch == null)
+                return;
+
+            for (InetAddress ep : currentLocations.keySet())
+                updateEndpoint(ep, snitch);
+        }
+
+        private void updateEndpoint(InetAddress ep, IEndpointSnitch snitch)
+        {
+            Pair<String, String> current = currentLocations.get(ep);
+            String dc = snitch.getDatacenter(ep);
+            String rack = snitch.getRack(ep);
+            if (dc.equals(current.left) && rack.equals(current.right))
+                return;
+
+            doRemoveEndpoint(ep, current);
+            doAddEndpoint(ep, dc, rack);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/257cdaa0/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java b/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java
index 3237979..e6691c4 100644
--- a/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java
+++ b/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java
@@ -120,6 +120,7 @@ public class YamlFileNetworkTopologySnitch
                 protected void runMayThrow() throws ConfigurationException
                 {
                     loadTopologyConfiguration();
+                    StorageService.instance.updateTopology();
                 }
             };
             ResourceWatcher.watch(topologyConfigFilename, runnable,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/257cdaa0/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 5ac4980..c5f159e 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1378,9 +1378,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                         SystemKeyspace.updatePeerInfo(endpoint, "release_version", quote(value.value));
                         break;
                     case DC:
+                        updateTopology(endpoint);
                         SystemKeyspace.updatePeerInfo(endpoint, "data_center", quote(value.value));
                         break;
                     case RACK:
+                        updateTopology(endpoint);
                         SystemKeyspace.updatePeerInfo(endpoint, "rack", quote(value.value));
                         break;
                     case RPC_ADDRESS:
@@ -1398,6 +1400,20 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
     }
 
+    public void updateTopology(InetAddress endpoint)
+    {
+        if (getTokenMetadata().isMember(endpoint))
+        {
+            getTokenMetadata().updateTopology(endpoint);
+        }
+    }
+
+    public void updateTopology()
+    {
+        getTokenMetadata().updateTopology();
+
+    }
+
     private void updatePeerInfo(InetAddress endpoint)
     {
         EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/257cdaa0/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
index 95118dc..fc8095d 100644
--- a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
+++ b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
@@ -19,19 +19,27 @@
 package org.apache.cassandra.locator;
 
 import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Map;
+
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Multimap;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+
+import static junit.framework.Assert.assertNotNull;
 import static org.junit.Assert.assertEquals;
 
 import static org.apache.cassandra.Util.token;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.StorageService;
 
 @RunWith(OrderedJUnit4ClassRunner.class)
@@ -50,9 +58,9 @@ public class TokenMetadataTest
         tmd.updateNormalToken(token(SIX), InetAddress.getByName("127.0.0.6"));
     }
 
-    private void testRingIterator(ArrayList<Token> ring, String start, boolean includeMin, String... expected)
+    private static void testRingIterator(ArrayList<Token> ring, String start, boolean includeMin, String... expected)
     {
-        ArrayList<Token> actual = new ArrayList<Token>();
+        ArrayList<Token> actual = new ArrayList<>();
         Iterators.addAll(actual, TokenMetadata.ringIterator(ring, token(start), includeMin));
         assertEquals(actual.toString(), expected.length, actual.size());
         for (int i = 0; i < expected.length; i++)
@@ -84,4 +92,199 @@ public class TokenMetadataTest
     {
         testRingIterator(new ArrayList<Token>(), "2", false);
     }
+
+    @Test
+    public void testTopologyUpdate_RackConsolidation() throws UnknownHostException
+    {
+        final InetAddress first = InetAddress.getByName("127.0.0.1");
+        final InetAddress second = InetAddress.getByName("127.0.0.6");
+        final String DATA_CENTER = "datacenter1";
+        final String RACK1 = "rack1";
+        final String RACK2 = "rack2";
+
+        DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch()
+        {
+            @Override
+            public String getRack(InetAddress endpoint)
+            {
+                return endpoint.equals(first) ? RACK1 : RACK2;
+            }
+
+            @Override
+            public String getDatacenter(InetAddress endpoint)
+            {
+                return DATA_CENTER;
+            }
+
+            @Override
+            public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+            {
+                return 0;
+            }
+        });
+
+        tmd.updateNormalToken(token(ONE), first);
+        tmd.updateNormalToken(token(SIX), second);
+
+        TokenMetadata tokenMetadata = tmd.cloneOnlyTokenMap();
+        assertNotNull(tokenMetadata);
+
+        TokenMetadata.Topology topology = tokenMetadata.getTopology();
+        assertNotNull(topology);
+
+        Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints();
+        assertNotNull(allEndpoints);
+        assertTrue(allEndpoints.size() == 2);
+        assertTrue(allEndpoints.containsKey(DATA_CENTER));
+        assertTrue(allEndpoints.get(DATA_CENTER).contains(first));
+        assertTrue(allEndpoints.get(DATA_CENTER).contains(second));
+
+        Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
+        assertNotNull(racks);
+        assertTrue(racks.size() == 1);
+        assertTrue(racks.containsKey(DATA_CENTER));
+        assertTrue(racks.get(DATA_CENTER).size() == 2);
+        assertTrue(racks.get(DATA_CENTER).containsKey(RACK1));
+        assertTrue(racks.get(DATA_CENTER).containsKey(RACK2));
+        assertTrue(racks.get(DATA_CENTER).get(RACK1).contains(first));
+        assertTrue(racks.get(DATA_CENTER).get(RACK2).contains(second));
+
+        DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch()
+        {
+            @Override
+            public String getRack(InetAddress endpoint)
+            {
+                return RACK1;
+            }
+
+            @Override
+            public String getDatacenter(InetAddress endpoint)
+            {
+                return DATA_CENTER;
+            }
+
+            @Override
+            public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+            {
+                return 0;
+            }
+        });
+
+        tokenMetadata.updateTopology(first);
+        tokenMetadata.updateTopology(second);
+
+        allEndpoints = topology.getDatacenterEndpoints();
+        assertNotNull(allEndpoints);
+        assertTrue(allEndpoints.size() == 2);
+        assertTrue(allEndpoints.containsKey(DATA_CENTER));
+        assertTrue(allEndpoints.get(DATA_CENTER).contains(first));
+        assertTrue(allEndpoints.get(DATA_CENTER).contains(second));
+
+        racks = topology.getDatacenterRacks();
+        assertNotNull(racks);
+        assertTrue(racks.size() == 1);
+        assertTrue(racks.containsKey(DATA_CENTER));
+        assertTrue(racks.get(DATA_CENTER).size() == 2);
+        assertTrue(racks.get(DATA_CENTER).containsKey(RACK1));
+        assertFalse(racks.get(DATA_CENTER).containsKey(RACK2));
+        assertTrue(racks.get(DATA_CENTER).get(RACK1).contains(first));
+        assertTrue(racks.get(DATA_CENTER).get(RACK1).contains(second));
+    }
+
+    @Test
+    public void testTopologyUpdate_RackExpansion() throws UnknownHostException
+    {
+        final InetAddress first = InetAddress.getByName("127.0.0.1");
+        final InetAddress second = InetAddress.getByName("127.0.0.6");
+        final String DATA_CENTER = "datacenter1";
+        final String RACK1 = "rack1";
+        final String RACK2 = "rack2";
+
+        DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch()
+        {
+            @Override
+            public String getRack(InetAddress endpoint)
+            {
+                return RACK1;
+            }
+
+            @Override
+            public String getDatacenter(InetAddress endpoint)
+            {
+                return DATA_CENTER;
+            }
+
+            @Override
+            public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+            {
+                return 0;
+            }
+        });
+
+        tmd.updateNormalToken(token(ONE), first);
+        tmd.updateNormalToken(token(SIX), second);
+
+        TokenMetadata tokenMetadata = tmd.cloneOnlyTokenMap();
+        assertNotNull(tokenMetadata);
+
+        TokenMetadata.Topology topology = tokenMetadata.getTopology();
+        assertNotNull(topology);
+
+        Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints();
+        assertNotNull(allEndpoints);
+        assertTrue(allEndpoints.size() == 2);
+        assertTrue(allEndpoints.containsKey(DATA_CENTER));
+        assertTrue(allEndpoints.get(DATA_CENTER).contains(first));
+        assertTrue(allEndpoints.get(DATA_CENTER).contains(second));
+
+        Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
+        assertNotNull(racks);
+        assertTrue(racks.size() == 1);
+        assertTrue(racks.containsKey(DATA_CENTER));
+        assertTrue(racks.get(DATA_CENTER).size() == 2);
+        assertTrue(racks.get(DATA_CENTER).containsKey(RACK1));
+        assertFalse(racks.get(DATA_CENTER).containsKey(RACK2));
+        assertTrue(racks.get(DATA_CENTER).get(RACK1).contains(first));
+        assertTrue(racks.get(DATA_CENTER).get(RACK1).contains(second));
+
+        DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch()
+        {
+            @Override
+            public String getRack(InetAddress endpoint)
+            {
+                return endpoint.equals(first) ? RACK1 : RACK2;
+            }
+
+            @Override
+            public String getDatacenter(InetAddress endpoint)
+            {
+                return DATA_CENTER;
+            }
+
+            @Override
+            public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+            {
+                return 0;
+            }
+        });
+
+        tokenMetadata.updateTopology();
+
+        allEndpoints = topology.getDatacenterEndpoints();
+        assertNotNull(allEndpoints);
+        assertTrue(allEndpoints.size() == 2);
+        assertTrue(allEndpoints.containsKey(DATA_CENTER));
+        assertTrue(allEndpoints.get(DATA_CENTER).contains(first));
+        assertTrue(allEndpoints.get(DATA_CENTER).contains(second));
+
+        racks = topology.getDatacenterRacks();
+        assertNotNull(racks);
+        assertTrue(racks.size() == 1);
+        assertTrue(racks.containsKey(DATA_CENTER));
+        assertTrue(racks.get(DATA_CENTER).size() == 2);
+        assertTrue(racks.get(DATA_CENTER).containsKey(RACK1));
+        assertTrue(racks.get(DATA_CENTER).containsKey(RACK2));
+        assertTrue(racks.get(DATA_CENTER).get(RACK1).contains(first));
+        assertTrue(racks.get(DATA_CENTER).get(RACK2).contains(second));
+    }
 }


[3/3] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by al...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5b05b682
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5b05b682
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5b05b682

Branch: refs/heads/cassandra-2.2
Commit: 5b05b682641f634d8b6548d225b9130b771df969
Parents: e7c2952 4b1d59e
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Sep 16 11:48:56 2015 +0100
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Sep 16 11:48:56 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../locator/GossipingPropertyFileSnitch.java    |   1 +
 .../cassandra/locator/PropertyFileSnitch.java   |   1 +
 .../apache/cassandra/locator/TokenMetadata.java | 142 +++++++++----
 .../cassandra/service/StorageService.java       |  16 ++
 .../cassandra/locator/TokenMetadataTest.java    | 209 ++++++++++++++++++-
 6 files changed, 332 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b05b682/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b0ade42,12523be..7b5fa78
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -20,8 -10,8 +20,9 @@@ Merged from 2.1
   * Only check KeyCache when it is enabled
   * Change streaming_socket_timeout_in_ms default to 1 hour (CASSANDRA-8611)
   * (cqlsh) update list of CQL keywords (CASSANDRA-9232)
 + * Add nodetool gettraceprobability command (CASSANDRA-10234)
  Merged from 2.0:
+  * Fix consolidating racks violating the RF contract (CASSANDRA-10238)
   * Disallow decommission when node is in drained state (CASSANDRA-8741)
  
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b05b682/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b05b682/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b05b682/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------


[2/3] cassandra git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by al...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4b1d59e1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4b1d59e1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4b1d59e1

Branch: refs/heads/cassandra-2.2
Commit: 4b1d59e13143f5a55478a7cfcaca61ba22259f9f
Parents: b8b4eb7 257cdaa
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Sep 16 11:44:28 2015 +0100
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Sep 16 11:44:28 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../locator/GossipingPropertyFileSnitch.java    |   1 +
 .../cassandra/locator/PropertyFileSnitch.java   |   1 +
 .../apache/cassandra/locator/TokenMetadata.java | 142 +++++++++----
 .../locator/YamlFileNetworkTopologySnitch.java  |   1 +
 .../cassandra/service/StorageService.java       |  16 ++
 .../cassandra/locator/TokenMetadataTest.java    | 209 ++++++++++++++++++-
 7 files changed, 333 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b1d59e1/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 5f11049,3c47427..12523be
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,46 -1,7 +1,47 @@@
 -2.0.17
 +2.1.10
 + * (Pig) support BulkOutputFormat as a URL parameter (CASSANDRA-7410)
 + * BATCH statement is broken in cqlsh (CASSANDRA-10272)
 + * Added configurable warning threshold for GC duration (CASSANDRA-8907)
 + * (cqlsh) Make cqlsh PEP8 compliant (CASSANDRA-10066)
 + * (cqlsh) Fix error when starting cqlsh with --debug (CASSANDRA-10282)
 + * Scrub, Cleanup and Upgrade do not unmark compacting until all operations
 +   have completed, regardless of the occurence of exceptions (CASSANDRA-10274)
 + * Fix handling of streaming EOF (CASSANDRA-10206)
 + * Only check KeyCache when it is enabled
 + * Change streaming_socket_timeout_in_ms default to 1 hour (CASSANDRA-8611)
 + * (cqlsh) update list of CQL keywords (CASSANDRA-9232)
 +Merged from 2.0:
+  * Fix consolidating racks violating the RF contract (CASSANDRA-10238)
   * Disallow decommission when node is in drained state (CASSANDRA-8741)
 - * Backport CASSANDRA-8013 to 2.0 (CASSANDRA-10144)
 +
 +
 +2.1.9
 + * Avoid race condition during read repair (CASSANDRA-9460)
 + * (cqlsh) default load-from-file encoding to utf-8 (CASSANDRA-9898)
 + * Avoid returning Permission.NONE when failing to query users table (CASSANDRA-10168)
 + * (cqlsh) Allow encoding to be set through command line (CASSANDRA-10004)
 + * Add new JMX methods to change local compaction strategy (CASSANDRA-9965)
 + * Write hints for paxos commits (CASSANDRA-7342)
 + * (cqlsh) Fix timestamps before 1970 on Windows, always
 +   use UTC for timestamp display (CASSANDRA-10000)
 + * (cqlsh) Avoid overwriting new config file with old config
 +   when both exist (CASSANDRA-9777)
 + * Release snapshot selfRef when doing snapshot repair (CASSANDRA-9998)
 + * Cannot replace token does not exist - DN node removed as Fat Client (CASSANDRA-9871)
 + * Fix handling of enable/disable autocompaction (CASSANDRA-9899)
 + * Commit log segment recycling is disabled by default (CASSANDRA-9896)
 + * Add consistency level to tracing ouput (CASSANDRA-9827)
 + * Fix MarshalException when upgrading superColumn family (CASSANDRA-9582)
 + * Fix broken logging for "empty" flushes in Memtable (CASSANDRA-9837)
 + * Handle corrupt files on startup (CASSANDRA-9686)
 + * Fix clientutil jar and tests (CASSANDRA-9760)
 + * (cqlsh) Allow the SSL protocol version to be specified through the
 +   config file or environment variables (CASSANDRA-9544)
 + * Remove repair snapshot leftover on startup (CASSANDRA-7357)
 + * Use random nodes for batch log when only 2 racks (CASSANDRA-8735)
 + * Ensure atomicity inside thrift and stream session (CASSANDRA-7757)
 + * Fix nodetool info error when the node is not joined (CASSANDRA-9031)
 +Merged from 2.0:
   * Make getFullyExpiredSSTables less expensive (CASSANDRA-9882)
   * Add tool to find why expired sstables are not getting dropped (CASSANDRA-10015)
   * Remove erroneous pending HH tasks from tpstats/jmx (CASSANDRA-9129)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b1d59e1/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
index 2c0980a,dd1637d..da43600
--- a/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
+++ b/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
@@@ -72,24 -62,7 +72,25 @@@ public class GossipingPropertyFileSnitc
          }
          catch (ConfigurationException e)
          {
 -            logger.info("Unable to load " + PropertyFileSnitch.SNITCH_PROPERTIES_FILENAME + "; compatibility mode disabled");
 +            logger.info("Unable to load {}; compatibility mode disabled", PropertyFileSnitch.SNITCH_PROPERTIES_FILENAME);
 +        }
 +
 +        try
 +        {
 +            FBUtilities.resourceToFile(SnitchProperties.RACKDC_PROPERTY_FILENAME);
 +            Runnable runnable = new WrappedRunnable()
 +            {
 +                protected void runMayThrow() throws ConfigurationException
 +                {
 +                    reloadConfiguration();
++                    StorageService.instance.updateTopology(FBUtilities.getBroadcastAddress());
 +                }
 +            };
 +            ResourceWatcher.watch(SnitchProperties.RACKDC_PROPERTY_FILENAME, runnable, refreshPeriodInSeconds * 1000);
 +        }
 +        catch (ConfigurationException ex)
 +        {
 +            logger.error("{} found, but does not look like a plain file. Will not watch it for changes", SnitchProperties.RACKDC_PROPERTY_FILENAME);
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b1d59e1/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b1d59e1/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b1d59e1/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b1d59e1/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index acc82e6,c5f159e..f5950e3
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -1520,26 -1375,21 +1520,28 @@@ public class StorageService extends Not
                  switch (state)
                  {
                      case RELEASE_VERSION:
 -                        SystemKeyspace.updatePeerInfo(endpoint, "release_version", quote(value.value));
 +                        SystemKeyspace.updatePeerInfo(endpoint, "release_version", value.value);
                          break;
                      case DC:
+                         updateTopology(endpoint);
 -                        SystemKeyspace.updatePeerInfo(endpoint, "data_center", quote(value.value));
 +                        SystemKeyspace.updatePeerInfo(endpoint, "data_center", value.value);
                          break;
                      case RACK:
+                         updateTopology(endpoint);
 -                        SystemKeyspace.updatePeerInfo(endpoint, "rack", quote(value.value));
 +                        SystemKeyspace.updatePeerInfo(endpoint, "rack", value.value);
                          break;
                      case RPC_ADDRESS:
 -                        SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", quote(value.value));
 +                        try
 +                        {
 +                            SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(value.value));
 +                        }
 +                        catch (UnknownHostException e)
 +                        {
 +                            throw new RuntimeException(e);
 +                        }
                          break;
                      case SCHEMA:
 -                        SystemKeyspace.updatePeerInfo(endpoint, "schema_version", value.value);
 +                        SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value.value));
                          MigrationManager.instance.scheduleSchemaPull(endpoint, epState);
                          break;
                      case HOST_ID: