You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/09/18 20:16:29 UTC

cassandra git commit: Transfer hints to a different node on decommission

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 a29d20653 -> 959b96efe


Transfer hints to a different node on decommission

patch by Marcus Eriksson; reviewed by Aleksey Yeschenko for
CASSANDRA-10198


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/959b96ef
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/959b96ef
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/959b96ef

Branch: refs/heads/cassandra-3.0
Commit: 959b96efee613363fde28de1e2b34aa9201efd7a
Parents: a29d206
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Sep 1 15:37:37 2015 +0200
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Sep 18 19:14:41 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/hints/HintsCatalog.java    |  8 +++
 .../cassandra/hints/HintsDispatchExecutor.java  | 64 +++++++++++++++++++-
 .../apache/cassandra/hints/HintsDispatcher.java |  4 +-
 .../apache/cassandra/hints/HintsService.java    | 45 ++++++++++++--
 .../cassandra/service/StorageService.java       | 29 ++++-----
 6 files changed, 124 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/959b96ef/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 775d8a4..d95d833 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.0-rc1
+ * Transfer hints to a different node on decommission (CASSANDRA-10198)
  * Check partition keys for CAS operations during stmt validation (CASSANDRA-10338)
  * Add custom query expressions to SELECT (CASSANDRA-10217)
  * Fix minor bugs in MV handling (CASSANDRA-10362)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/959b96ef/src/java/org/apache/cassandra/hints/HintsCatalog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsCatalog.java b/src/java/org/apache/cassandra/hints/HintsCatalog.java
index 13404ee..cb8e1fd 100644
--- a/src/java/org/apache/cassandra/hints/HintsCatalog.java
+++ b/src/java/org/apache/cassandra/hints/HintsCatalog.java
@@ -110,6 +110,14 @@ final class HintsCatalog
             store.deleteAllHints();
     }
 
+    /**
+     * @return true if at least one of the stores has a file pending dispatch
+     */
+    boolean hasFiles()
+    {
+        return stores().anyMatch(HintsStore::hasFiles);
+    }
+
     void exciseStore(UUID hostId)
     {
         deleteAllHints(hostId);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/959b96ef/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
index ab7bc7f..1f0191a 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 
 import com.google.common.util.concurrent.RateLimiter;
 
@@ -93,7 +94,12 @@ final class HintsDispatchExecutor
          *
          * It also simplifies reasoning about dispatch sessions.
          */
-        return scheduledDispatches.computeIfAbsent(store.hostId, uuid -> executor.submit(new DispatchHintsTask(store, hostId)));
+        return scheduledDispatches.computeIfAbsent(hostId, uuid -> executor.submit(new DispatchHintsTask(store, hostId)));
+    }
+
+    Future transfer(HintsCatalog catalog, Supplier<UUID> hostIdSupplier)
+    {
+        return executor.submit(new TransferHintsTask(catalog, hostIdSupplier));
     }
 
     void completeDispatchBlockingly(HintsStore store)
@@ -110,6 +116,60 @@ final class HintsDispatchExecutor
         }
     }
 
+    private final class TransferHintsTask implements Runnable
+    {
+        private final HintsCatalog catalog;
+
+        /*
+         * Supplies target hosts to stream to. Generally returns the one the DynamicSnitch thinks is closest.
+         * We use a supplier here to be able to get a new host if the current one dies during streaming.
+         */
+        private final Supplier<UUID> hostIdSupplier;
+
+        private TransferHintsTask(HintsCatalog catalog, Supplier<UUID> hostIdSupplier)
+        {
+            this.catalog = catalog;
+            this.hostIdSupplier = hostIdSupplier;
+        }
+
+        @Override
+        public void run()
+        {
+            UUID hostId = hostIdSupplier.get();
+            logger.info("Transferring all hints to {}", hostId);
+            if (transfer(hostId))
+                return;
+
+            logger.warn("Failed to transfer all hints to {}; will retry in {} seconds", hostId, 10);
+
+            try
+            {
+                TimeUnit.SECONDS.sleep(10);
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
+
+            hostId = hostIdSupplier.get();
+            logger.info("Transferring all hints to {}", hostId);
+            if (!transfer(hostId))
+            {
+                logger.error("Failed to transfer all hints to {}", hostId);
+                throw new RuntimeException("Failed to transfer all hints to " + hostId);
+            }
+        }
+
+        private boolean transfer(UUID hostId)
+        {
+            catalog.stores()
+                   .map(store -> new DispatchHintsTask(store, hostId))
+                   .forEach(Runnable::run);
+
+            return !catalog.hasFiles();
+        }
+    }
+
     private final class DispatchHintsTask implements Runnable
     {
         private final HintsStore store;
@@ -179,7 +239,7 @@ final class HintsDispatchExecutor
             File file = new File(hintsDirectory, descriptor.fileName());
             Long offset = store.getDispatchOffset(descriptor).orElse(null);
 
-            try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, hostId, isPaused))
+            try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, hostId, descriptor.hostId, isPaused))
             {
                 if (offset != null)
                     dispatcher.seek(offset);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/959b96ef/src/java/org/apache/cassandra/hints/HintsDispatcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
index f769e09..94a6669 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
@@ -64,11 +64,11 @@ final class HintsDispatcher implements AutoCloseable
         this.isPaused = isPaused;
     }
 
-    static HintsDispatcher create(File file, RateLimiter rateLimiter, UUID hostId, AtomicBoolean isPaused)
+    static HintsDispatcher create(File file, RateLimiter rateLimiter, UUID hostId, UUID hintFor, AtomicBoolean isPaused)
     {
         InetAddress address = StorageService.instance.getEndpointForHostId(hostId);
         int messagingVersion = MessagingService.instance().getVersion(address);
-        return new HintsDispatcher(HintsReader.open(file, rateLimiter), hostId, address, messagingVersion, isPaused);
+        return new HintsDispatcher(HintsReader.open(file, rateLimiter), hintFor, address, messagingVersion, isPaused);
     }
 
     public void close()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/959b96ef/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
index 3f30c1d..6aed07f 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -202,11 +203,6 @@ public final class HintsService implements HintsServiceMBean
         writeExecutor.shutdownBlocking();
     }
 
-    public void decommission()
-    {
-        resumeDispatch();
-    }
-
     /**
      * Deletes all hints for all destinations. Doesn't make snapshots - should be used with care.
      */
@@ -288,4 +284,43 @@ public final class HintsService implements HintsServiceMBean
         // delete all the hints files and remove the HintsStore instance from the map in the catalog
         catalog.exciseStore(hostId);
     }
+
+    /**
+     * Transfer all local hints to the hostId supplied by hostIdSupplier
+     *
+     * Flushes the buffer to make sure all hints are on disk and closes the hint writers
+     * so we don't leave any hint files around.
+     *
+     * After that, we serially dispatch all the hints in the HintsCatalog.
+     *
+     * If we fail delivering all hints, we will ask the hostIdSupplier for a new target host
+     * and retry delivering any remaining hints there, once, with a delay of 10 seconds before retrying.
+     *
+     * @param hostIdSupplier supplier of stream target host ids. This is generally
+     *                       the closest one according to the DynamicSnitch
+     * @return When this future is done, it either has streamed all hints to remote nodes or has failed with a proper
+     *         log message
+     */
+    public Future transferHints(Supplier<UUID> hostIdSupplier)
+    {
+        Future flushFuture = writeExecutor.flushBufferPool(bufferPool);
+        Future closeFuture = writeExecutor.closeAllWriters();
+        try
+        {
+            flushFuture.get();
+            closeFuture.get();
+        }
+        catch (InterruptedException | ExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        // unpause dispatch, or else transfer() will return immediately
+        resumeDispatch();
+
+        // wait for the current dispatch session to end
+        catalog.stores().forEach(dispatchExecutor::completeDispatchBlockingly);
+
+        return dispatchExecutor.transfer(catalog, hostIdSupplier);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/959b96ef/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index dd428b6..0a8717f 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3438,7 +3438,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
         setMode(Mode.LEAVING, "streaming hints to other nodes", true);
 
-        Future<StreamState> hintsSuccess = streamHints();
+        Future hintsSuccess = streamHints();
 
         // wait for the transfer runnables to signal the latch.
         logger.debug("waiting for stream acks.");
@@ -3456,13 +3456,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         onFinish.run();
     }
 
-    private Future<StreamState> streamHints()
+    private Future streamHints()
     {
-        // StreamPlan will not fail if there are zero files to transfer, so flush anyway (need to get any in-memory hints, as well)
-        ColumnFamilyStore hintsCF = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS);
-        FBUtilities.waitOnFuture(hintsCF.forceFlush());
+        return HintsService.instance.transferHints(this::getPreferredHintsStreamTarget);
+    }
 
-        // gather all live nodes in the cluster that aren't also leaving
+    /**
+     * Find the best target to stream hints to. Currently the closest peer according to the snitch
+     */
+    private UUID getPreferredHintsStreamTarget()
+    {
         List<InetAddress> candidates = new ArrayList<>(StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints());
         candidates.remove(FBUtilities.getBroadcastAddress());
         for (Iterator<InetAddress> iter = candidates.iterator(); iter.hasNext(); )
@@ -3475,7 +3478,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         if (candidates.isEmpty())
         {
             logger.warn("Unable to stream hints since no live endpoints seen");
-            return Futures.immediateFuture(null);
+            throw new RuntimeException("Unable to stream hints since no live endpoints seen");
         }
         else
         {
@@ -3483,17 +3486,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), candidates);
             InetAddress hintsDestinationHost = candidates.get(0);
             InetAddress preferred = SystemKeyspace.getPreferredIP(hintsDestinationHost);
-
-            // stream all hints -- range list will be a singleton of "the entire ring"
-            Token token = tokenMetadata.partitioner.getMinimumToken();
-            List<Range<Token>> ranges = Collections.singletonList(new Range<>(token, token));
-
-            return new StreamPlan("Hints").transferRanges(hintsDestinationHost,
-                                                          preferred,
-                                                          SystemKeyspace.NAME,
-                                                          ranges,
-                                                          SystemKeyspace.LEGACY_HINTS)
-                                          .execute();
+            return tokenMetadata.getHostId(preferred);
         }
     }