You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/01/24 20:44:17 UTC

[1/3] git commit: stream undelivered hints on decommission patch by Jason Brown; reviewed by jbellis for CASSANDRA-5128

stream undelivered hints on decommission
patch by Jason Brown; reviewed by jbellis for CASSANDRA-5128


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc66c73b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc66c73b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc66c73b

Branch: refs/heads/cassandra-1.2
Commit: cc66c73ba60ec3f062e3fd699b76e352f8b10f6d
Parents: 360d1a2
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Jan 24 13:42:37 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Jan 24 13:42:54 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../apache/cassandra/locator/TokenMetadata.java    |   17 ++-
 .../apache/cassandra/service/StorageService.java   |   86 ++++++++++++---
 .../org/apache/cassandra/streaming/StreamOut.java  |   10 ++-
 4 files changed, 94 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc66c73b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 812abdd..3a53a34 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2.1
+ * stream undelivered hints on decommission (CASSANDRA-5128)
  * GossipingPropertyFileSnitch loads saved dc/rack info if needed (CASSANDRA-5133)
  * drain should flush system CFs too (CASSANDRA-4446)
  * add inter_dc_tcp_nodelay setting (CASSANDRA-5148)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc66c73b/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 3b5f86d..925a811 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -105,14 +105,16 @@ public class TokenMetadata
 
     public TokenMetadata()
     {
-        this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp), new Topology());
+        this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp),
+             HashBiMap.<InetAddress, UUID>create(),
+             new Topology());
     }
 
-    public TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, Topology topology)
+    private TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, BiMap<InetAddress, UUID> endpointsMap, Topology topology)
     {
         this.tokenToEndpointMap = tokenToEndpointMap;
         this.topology = topology;
-        endpointToHostIdMap = HashBiMap.create();
+        endpointToHostIdMap = endpointsMap;
         sortedTokens = sortTokens();
     }
 
@@ -556,7 +558,9 @@ public class TokenMetadata
         lock.readLock().lock();
         try
         {
-            return new TokenMetadata(SortedBiMultiValMap.<Token, InetAddress>create(tokenToEndpointMap, null, inetaddressCmp), new Topology(topology));
+            return new TokenMetadata(SortedBiMultiValMap.<Token, InetAddress>create(tokenToEndpointMap, null, inetaddressCmp),
+                                     HashBiMap.create(endpointToHostIdMap),
+                                     new Topology(topology));
         }
         finally
         {
@@ -719,6 +723,11 @@ public class TokenMetadata
         }
     }
 
+    public Set<InetAddress> getAllEndpoints()
+    {
+        return endpointToHostIdMap.keySet();
+    }
+
     /** caller should not modify leavingEndpoints */
     public Set<InetAddress> getLeavingEndpoints()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc66c73b/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 2f178b6..21f7c69 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2688,12 +2688,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         setMode(Mode.LEAVING, "streaming data to other nodes", true);
 
         CountDownLatch latch = streamRanges(rangesToStream);
+        CountDownLatch hintsLatch = streamHints();
 
         // wait for the transfer runnables to signal the latch.
         logger.debug("waiting for stream aks.");
         try
         {
             latch.await();
+            hintsLatch.await();
         }
         catch (InterruptedException e)
         {
@@ -2704,6 +2706,47 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         onFinish.run();
     }
 
+    private CountDownLatch streamHints()
+    {
+        if (HintedHandOffManager.instance.listEndpointsPendingHints().size() == 0)
+            return new CountDownLatch(0);
+
+        // gather all live nodes in the cluster that aren't also leaving
+        List<InetAddress> candidates = new ArrayList<InetAddress>(StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints());
+        candidates.remove(FBUtilities.getBroadcastAddress());
+        for (Iterator<InetAddress> iter = candidates.iterator(); iter.hasNext(); )
+        {
+            InetAddress address = iter.next();
+            if (!FailureDetector.instance.isAlive(address))
+                iter.remove();
+        }
+
+        if (candidates.isEmpty())
+        {
+            logger.warn("Unable to stream hints since no live endpoints seen");
+            return new CountDownLatch(0);
+        }
+        else
+        {
+            // stream to the closest peer as chosen by the snitch
+            DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), candidates);
+            InetAddress hintsDestinationHost = candidates.get(0);
+
+            // stream all hints -- range list will be a singleton of "the entire ring"
+            Token token = StorageService.getPartitioner().getMinimumToken();
+            List<Range<Token>> ranges = Collections.singletonList(new Range<Token>(token, token));
+
+            CountDownLatch latch = new CountDownLatch(1);
+            StreamOut.transferRanges(hintsDestinationHost,
+                                     Table.open(Table.SYSTEM_KS),
+                                     Collections.singletonList(Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.HINTS_CF)),
+                                     ranges,
+                                     new CountingDownStreamCallback(latch, hintsDestinationHost),
+                                     OperationType.UNBOOTSTRAP);
+            return latch;
+        }
+    }
+
     public void move(String newToken) throws IOException
     {
         try
@@ -3474,27 +3517,40 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 final List<Range<Token>> ranges = rangesEntry.getValue();
                 final InetAddress newEndpoint = rangesEntry.getKey();
 
-                final IStreamCallback callback = new IStreamCallback()
-                {
-                    public void onSuccess()
-                    {
-                        latch.countDown();
-                    }
-
-                    public void onFailure()
-                    {
-                        logger.warn("Streaming to " + newEndpoint + " failed");
-                        onSuccess(); // calling onSuccess for latch countdown
-                    }
-                };
-
                 // TODO each call to transferRanges re-flushes, this is potentially a lot of waste
-                StreamOut.transferRanges(newEndpoint, Table.open(table), ranges, callback, OperationType.UNBOOTSTRAP);
+                StreamOut.transferRanges(newEndpoint,
+                                         Table.open(table),
+                                         ranges,
+                                         new CountingDownStreamCallback(latch, newEndpoint),
+                                         OperationType.UNBOOTSTRAP);
             }
         }
         return latch;
     }
 
+    class CountingDownStreamCallback implements IStreamCallback
+    {
+        private final CountDownLatch latch;
+        private final InetAddress targetAddr;
+
+        CountingDownStreamCallback(CountDownLatch latch, InetAddress targetAddr)
+        {
+            this.latch = latch;
+            this.targetAddr = targetAddr;
+        }
+
+        public void onSuccess()
+        {
+            latch.countDown();
+        }
+
+        public void onFailure()
+        {
+            logger.warn("Streaming to " + targetAddr + " failed");
+            onSuccess(); // calling onSuccess for latch countdown
+        }
+    };
+
     /**
      * Used to request ranges from endpoints in the ring (will block until all data is fetched and ready)
      * @param ranges ranges to fetch as map of the preferred address and range collection

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc66c73b/src/java/org/apache/cassandra/streaming/StreamOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamOut.java b/src/java/org/apache/cassandra/streaming/StreamOut.java
index 7043be4..7855d6b 100644
--- a/src/java/org/apache/cassandra/streaming/StreamOut.java
+++ b/src/java/org/apache/cassandra/streaming/StreamOut.java
@@ -80,8 +80,16 @@ public class StreamOut
     */
     public static void transferRanges(InetAddress target, Table table, Collection<Range<Token>> ranges, IStreamCallback callback, OperationType type)
     {
+        transferRanges(target, table, table.getColumnFamilyStores(), ranges, callback, type);
+    }
+
+    /**
+     * Stream the given ranges to the target endpoint for provided CFs in the given keyspace.
+     */
+    public static void transferRanges(InetAddress target, Table table, Iterable<ColumnFamilyStore> cfses, Collection<Range<Token>> ranges, IStreamCallback callback, OperationType type)
+    {
         StreamOutSession session = StreamOutSession.create(table.name, target, callback);
-        transferRanges(session, table.getColumnFamilyStores(), ranges, type);
+        transferRanges(session, cfses, ranges, type);
     }
 
     /**