You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/09/18 20:16:29 UTC
cassandra git commit: Transfer hints to a different node on
decommission
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 a29d20653 -> 959b96efe
Transfer hints to a different node on decommission
patch by Marcus Eriksson; reviewed by Aleksey Yeschenko for
CASSANDRA-10198
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/959b96ef
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/959b96ef
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/959b96ef
Branch: refs/heads/cassandra-3.0
Commit: 959b96efee613363fde28de1e2b34aa9201efd7a
Parents: a29d206
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Sep 1 15:37:37 2015 +0200
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Sep 18 19:14:41 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/hints/HintsCatalog.java | 8 +++
.../cassandra/hints/HintsDispatchExecutor.java | 64 +++++++++++++++++++-
.../apache/cassandra/hints/HintsDispatcher.java | 4 +-
.../apache/cassandra/hints/HintsService.java | 45 ++++++++++++--
.../cassandra/service/StorageService.java | 29 ++++-----
6 files changed, 124 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/959b96ef/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 775d8a4..d95d833 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.0-rc1
+ * Transfer hints to a different node on decommission (CASSANDRA-10198)
* Check partition keys for CAS operations during stmt validation (CASSANDRA-10338)
* Add custom query expressions to SELECT (CASSANDRA-10217)
* Fix minor bugs in MV handling (CASSANDRA-10362)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/959b96ef/src/java/org/apache/cassandra/hints/HintsCatalog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsCatalog.java b/src/java/org/apache/cassandra/hints/HintsCatalog.java
index 13404ee..cb8e1fd 100644
--- a/src/java/org/apache/cassandra/hints/HintsCatalog.java
+++ b/src/java/org/apache/cassandra/hints/HintsCatalog.java
@@ -110,6 +110,14 @@ final class HintsCatalog
store.deleteAllHints();
}
+ /**
+ * @return true if at least one of the stores has a file pending dispatch
+ */
+ boolean hasFiles()
+ {
+ return stores().anyMatch(HintsStore::hasFiles);
+ }
+
void exciseStore(UUID hostId)
{
deleteAllHints(hostId);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/959b96ef/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
index ab7bc7f..1f0191a 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
import com.google.common.util.concurrent.RateLimiter;
@@ -93,7 +94,12 @@ final class HintsDispatchExecutor
*
* It also simplifies reasoning about dispatch sessions.
*/
- return scheduledDispatches.computeIfAbsent(store.hostId, uuid -> executor.submit(new DispatchHintsTask(store, hostId)));
+ return scheduledDispatches.computeIfAbsent(hostId, uuid -> executor.submit(new DispatchHintsTask(store, hostId)));
+ }
+
+ Future transfer(HintsCatalog catalog, Supplier<UUID> hostIdSupplier)
+ {
+ return executor.submit(new TransferHintsTask(catalog, hostIdSupplier));
}
void completeDispatchBlockingly(HintsStore store)
@@ -110,6 +116,60 @@ final class HintsDispatchExecutor
}
}
+ private final class TransferHintsTask implements Runnable
+ {
+ private final HintsCatalog catalog;
+
+ /*
+ * Supplies target hosts to stream to. Generally returns the one the DynamicSnitch thinks is closest.
+ * We use a supplier here to be able to get a new host if the current one dies during streaming.
+ */
+ private final Supplier<UUID> hostIdSupplier;
+
+ private TransferHintsTask(HintsCatalog catalog, Supplier<UUID> hostIdSupplier)
+ {
+ this.catalog = catalog;
+ this.hostIdSupplier = hostIdSupplier;
+ }
+
+ @Override
+ public void run()
+ {
+ UUID hostId = hostIdSupplier.get();
+ logger.info("Transferring all hints to {}", hostId);
+ if (transfer(hostId))
+ return;
+
+ logger.warn("Failed to transfer all hints to {}; will retry in {} seconds", hostId, 10);
+
+ try
+ {
+ TimeUnit.SECONDS.sleep(10);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ hostId = hostIdSupplier.get();
+ logger.info("Transferring all hints to {}", hostId);
+ if (!transfer(hostId))
+ {
+ logger.error("Failed to transfer all hints to {}", hostId);
+ throw new RuntimeException("Failed to transfer all hints to " + hostId);
+ }
+ }
+
+ private boolean transfer(UUID hostId)
+ {
+ catalog.stores()
+ .map(store -> new DispatchHintsTask(store, hostId))
+ .forEach(Runnable::run);
+
+ return !catalog.hasFiles();
+ }
+ }
+
private final class DispatchHintsTask implements Runnable
{
private final HintsStore store;
@@ -179,7 +239,7 @@ final class HintsDispatchExecutor
File file = new File(hintsDirectory, descriptor.fileName());
Long offset = store.getDispatchOffset(descriptor).orElse(null);
- try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, hostId, isPaused))
+ try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, hostId, descriptor.hostId, isPaused))
{
if (offset != null)
dispatcher.seek(offset);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/959b96ef/src/java/org/apache/cassandra/hints/HintsDispatcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
index f769e09..94a6669 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
@@ -64,11 +64,11 @@ final class HintsDispatcher implements AutoCloseable
this.isPaused = isPaused;
}
- static HintsDispatcher create(File file, RateLimiter rateLimiter, UUID hostId, AtomicBoolean isPaused)
+ static HintsDispatcher create(File file, RateLimiter rateLimiter, UUID hostId, UUID hintFor, AtomicBoolean isPaused)
{
InetAddress address = StorageService.instance.getEndpointForHostId(hostId);
int messagingVersion = MessagingService.instance().getVersion(address);
- return new HintsDispatcher(HintsReader.open(file, rateLimiter), hostId, address, messagingVersion, isPaused);
+ return new HintsDispatcher(HintsReader.open(file, rateLimiter), hintFor, address, messagingVersion, isPaused);
}
public void close()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/959b96ef/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
index 3f30c1d..6aed07f 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -202,11 +203,6 @@ public final class HintsService implements HintsServiceMBean
writeExecutor.shutdownBlocking();
}
- public void decommission()
- {
- resumeDispatch();
- }
-
/**
* Deletes all hints for all destinations. Doesn't make snapshots - should be used with care.
*/
@@ -288,4 +284,43 @@ public final class HintsService implements HintsServiceMBean
// delete all the hints files and remove the HintsStore instance from the map in the catalog
catalog.exciseStore(hostId);
}
+
+ /**
+ * Transfer all local hints to the hostId supplied by hostIdSupplier
+ *
+ * Flushes the buffer to make sure all hints are on disk and closes the hint writers
+ * so we don't leave any hint files around.
+ *
+ * After that, we serially dispatch all the hints in the HintsCatalog.
+ *
+ * If we fail delivering all hints, we will ask the hostIdSupplier for a new target host
+ * and retry delivering any remaining hints there, once, with a delay of 10 seconds before retrying.
+ *
+ * @param hostIdSupplier supplier of stream target host ids. This is generally
+ * the closest one according to the DynamicSnitch
+ * @return When this future is done, it either has streamed all hints to remote nodes or has failed with a proper
+ * log message
+ */
+ public Future transferHints(Supplier<UUID> hostIdSupplier)
+ {
+ Future flushFuture = writeExecutor.flushBufferPool(bufferPool);
+ Future closeFuture = writeExecutor.closeAllWriters();
+ try
+ {
+ flushFuture.get();
+ closeFuture.get();
+ }
+ catch (InterruptedException | ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ // unpause dispatch, or else transfer() will return immediately
+ resumeDispatch();
+
+ // wait for the current dispatch session to end
+ catalog.stores().forEach(dispatchExecutor::completeDispatchBlockingly);
+
+ return dispatchExecutor.transfer(catalog, hostIdSupplier);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/959b96ef/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index dd428b6..0a8717f 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3438,7 +3438,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
setMode(Mode.LEAVING, "streaming hints to other nodes", true);
- Future<StreamState> hintsSuccess = streamHints();
+ Future hintsSuccess = streamHints();
// wait for the transfer runnables to signal the latch.
logger.debug("waiting for stream acks.");
@@ -3456,13 +3456,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
onFinish.run();
}
- private Future<StreamState> streamHints()
+ private Future streamHints()
{
- // StreamPlan will not fail if there are zero files to transfer, so flush anyway (need to get any in-memory hints, as well)
- ColumnFamilyStore hintsCF = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS);
- FBUtilities.waitOnFuture(hintsCF.forceFlush());
+ return HintsService.instance.transferHints(this::getPreferredHintsStreamTarget);
+ }
- // gather all live nodes in the cluster that aren't also leaving
+ /**
+ * Find the best target to stream hints to. Currently the closest peer according to the snitch
+ */
+ private UUID getPreferredHintsStreamTarget()
+ {
List<InetAddress> candidates = new ArrayList<>(StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints());
candidates.remove(FBUtilities.getBroadcastAddress());
for (Iterator<InetAddress> iter = candidates.iterator(); iter.hasNext(); )
@@ -3475,7 +3478,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (candidates.isEmpty())
{
logger.warn("Unable to stream hints since no live endpoints seen");
- return Futures.immediateFuture(null);
+ throw new RuntimeException("Unable to stream hints since no live endpoints seen");
}
else
{
@@ -3483,17 +3486,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), candidates);
InetAddress hintsDestinationHost = candidates.get(0);
InetAddress preferred = SystemKeyspace.getPreferredIP(hintsDestinationHost);
-
- // stream all hints -- range list will be a singleton of "the entire ring"
- Token token = tokenMetadata.partitioner.getMinimumToken();
- List<Range<Token>> ranges = Collections.singletonList(new Range<>(token, token));
-
- return new StreamPlan("Hints").transferRanges(hintsDestinationHost,
- preferred,
- SystemKeyspace.NAME,
- ranges,
- SystemKeyspace.LEGACY_HINTS)
- .execute();
+ return tokenMetadata.getHostId(preferred);
}
}