You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2016/02/11 14:12:35 UTC
[2/3] cassandra git commit: Properly handle hinted handoff after
topology changes
Properly handle hinted handoff after topology changes
patch by Branimir Lambov; reviewed by Aleksey Yeschenko for
CASSANDRA-5902
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b2f38ef1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b2f38ef1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b2f38ef1
Branch: refs/heads/trunk
Commit: b2f38ef177b5c19288f96d9bfe5304ec94391f73
Parents: 2a824c0
Author: Branimir Lambov <br...@datastax.com>
Authored: Wed Nov 18 16:26:37 2015 +0200
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Feb 11 13:06:16 2016 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/batchlog/BatchlogManager.java | 3 +-
.../apache/cassandra/hints/HintVerbHandler.java | 22 +++-
.../cassandra/hints/HintsDispatchExecutor.java | 28 ++++-
.../apache/cassandra/hints/HintsDispatcher.java | 7 +-
.../apache/cassandra/hints/HintsService.java | 20 +++-
.../apache/cassandra/service/StorageProxy.java | 20 +++-
.../cassandra/service/StorageService.java | 8 ++
.../org/apache/cassandra/hints/HintTest.java | 109 +++++++++++++++++++
9 files changed, 199 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 828ba21..e723aba 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.4
+ * Properly handle hinted handoff after topology changes (CASSANDRA-5902)
* AssertionError when listing sstable files on inconsistent disk state (CASSANDRA-11156)
* Fix wrong rack counting and invalid conditions check for TokenAllocation
(CASSANDRA-11139)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index 7ccc6f8..f5133bb 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -420,8 +420,7 @@ public class BatchlogManager implements BatchlogManagerMBean
String ks = mutation.getKeyspaceName();
Token tk = mutation.key().getToken();
- for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
- StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
+ for (InetAddress endpoint : StorageService.instance.getNaturalAndPendingEndpoints(ks, tk))
{
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/src/java/org/apache/cassandra/hints/HintVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintVerbHandler.java b/src/java/org/apache/cassandra/hints/HintVerbHandler.java
index b2c7b6a..d8838a9 100644
--- a/src/java/org/apache/cassandra/hints/HintVerbHandler.java
+++ b/src/java/org/apache/cassandra/hints/HintVerbHandler.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
/**
@@ -72,12 +73,23 @@ public final class HintVerbHandler implements IVerbHandler<HintMessage>
return;
}
- // Apply the hint if this node is the destination, store for future dispatch if this node isn't (must have gotten
- // it from a decommissioned node that had streamed it before going out).
- if (hostId.equals(StorageService.instance.getLocalHostUUID()))
- hint.apply();
- else
+ if (!hostId.equals(StorageService.instance.getLocalHostUUID()))
+ {
+ // the node is not the final destination of the hint (must have gotten it from a decommissioning node),
+ // so just store it locally, to be delivered later.
HintsService.instance.write(hostId, hint);
+ }
+ else if (!StorageProxy.instance.appliesLocally(hint.mutation))
+ {
+ // the topology has changed, and we are no longer a replica of the mutation - since we don't know which node(s)
+ // it has been handed over to, re-address the hint to all replicas; see CASSANDRA-5902.
+ HintsService.instance.writeForAllReplicas(hint);
+ }
+ else
+ {
+ // the common path - the node is both the destination and a valid replica for the hint.
+ hint.apply();
+ }
reply(id, message.from);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
index 7782d5d..5292dc1 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.hints;
import java.io.File;
+import java.net.InetAddress;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;
@@ -236,10 +237,21 @@ final class HintsDispatchExecutor
{
logger.trace("Dispatching hints file {}", descriptor.fileName());
+ InetAddress address = StorageService.instance.getEndpointForHostId(hostId);
+ if (address != null)
+ return deliver(descriptor, address);
+
+ // address == null means the target no longer exist; find new home for each hint entry.
+ convert(descriptor);
+ return true;
+ }
+
+ private boolean deliver(HintsDescriptor descriptor, InetAddress address)
+ {
File file = new File(hintsDirectory, descriptor.fileName());
Long offset = store.getDispatchOffset(descriptor).orElse(null);
- try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, hostId, descriptor.hostId, isPaused))
+ try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, address, descriptor.hostId, isPaused))
{
if (offset != null)
dispatcher.seek(offset);
@@ -260,5 +272,19 @@ final class HintsDispatchExecutor
}
}
}
+
+ // for each hint in the hints file for a node that isn't part of the ring anymore, write RF hints for each replica
+ private void convert(HintsDescriptor descriptor)
+ {
+ File file = new File(hintsDirectory, descriptor.fileName());
+
+ try (HintsReader reader = HintsReader.open(file, rateLimiter))
+ {
+ reader.forEach(page -> page.hintsIterator().forEachRemaining(HintsService.instance::writeForAllReplicas));
+ store.delete(descriptor);
+ store.cleanUp(descriptor);
+ logger.info("Finished converting hints file {}", descriptor.fileName());
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/src/java/org/apache/cassandra/hints/HintsDispatcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
index 94a6669..e582d88 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
@@ -32,7 +32,6 @@ import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
/**
@@ -64,11 +63,10 @@ final class HintsDispatcher implements AutoCloseable
this.isPaused = isPaused;
}
- static HintsDispatcher create(File file, RateLimiter rateLimiter, UUID hostId, UUID hintFor, AtomicBoolean isPaused)
+ static HintsDispatcher create(File file, RateLimiter rateLimiter, InetAddress address, UUID hostId, AtomicBoolean isPaused)
{
- InetAddress address = StorageService.instance.getEndpointForHostId(hostId);
int messagingVersion = MessagingService.instance().getVersion(address);
- return new HintsDispatcher(HintsReader.open(file, rateLimiter), hintFor, address, messagingVersion, isPaused);
+ return new HintsDispatcher(HintsReader.open(file, rateLimiter), hostId, address, messagingVersion, isPaused);
}
public void close()
@@ -152,6 +150,7 @@ final class HintsDispatcher implements AutoCloseable
/*
* Sending hints in compatibility mode.
*/
+
private <T> Action sendHints(Iterator<T> hints, Collection<Callback> callbacks, Function<T, Callback> sendFunction)
{
while (hints.hasNext())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
index 5001af4..5a32786 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -22,7 +22,6 @@ import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -40,9 +39,11 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.metrics.HintedHandoffMetrics;
import org.apache.cassandra.metrics.StorageMetrics;
-import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
+import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Iterables.size;
@@ -167,6 +168,21 @@ public final class HintsService implements HintsServiceMBean
}
/**
+ * Write a hint for all replicas. Used to re-dispatch hints whose destination is either missing or no longer correct.
+ */
+ void writeForAllReplicas(Hint hint)
+ {
+ String keyspaceName = hint.mutation.getKeyspaceName();
+ Token token = hint.mutation.key().getToken();
+
+ Iterable<UUID> hostIds =
+ transform(filter(StorageService.instance.getNaturalAndPendingEndpoints(keyspaceName, token), StorageProxy::shouldHint),
+ StorageService.instance::getHostIdForEndpoint);
+
+ write(hostIds, hint);
+ }
+
+ /**
* Flush the buffer pool for the selected target nodes, then fsync their writers.
*
* @param hostIds host ids of the nodes to flush and fsync hints for
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 8fa2082..5cebf27 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -71,6 +71,8 @@ import org.apache.cassandra.triggers.TriggerExecutor;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.AbstractIterator;
+import static com.google.common.collect.Iterables.contains;
+
public class StorageProxy implements StorageProxyMBean
{
public static final String MBEAN_NAME = "org.apache.cassandra.db:type=StorageProxy";
@@ -670,12 +672,10 @@ public class StorageProxy implements StorageProxyMBean
private static void hintMutation(Mutation mutation)
{
- Token tk = DatabaseDescriptor.getPartitioner().getToken(mutation.key().getKey());
- List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getKeyspaceName(), tk);
- Collection<InetAddress> pendingEndpoints =
- StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getKeyspaceName());
+ String keyspaceName = mutation.getKeyspaceName();
+ Token token = mutation.key().getToken();
- Iterable<InetAddress> endpoints = Iterables.concat(naturalEndpoints, pendingEndpoints);
+ Iterable<InetAddress> endpoints = StorageService.instance.getNaturalAndPendingEndpoints(keyspaceName, token);
ArrayList<InetAddress> endpointsToHint = new ArrayList<>(Iterables.size(endpoints));
// local writes can timeout, but cannot be dropped (see LocalMutationRunnable and CASSANDRA-6510),
@@ -687,6 +687,16 @@ public class StorageProxy implements StorageProxyMBean
submitHint(mutation, endpointsToHint, null);
}
+ public boolean appliesLocally(Mutation mutation)
+ {
+ String keyspaceName = mutation.getKeyspaceName();
+ Token token = mutation.key().getToken();
+ InetAddress local = FBUtilities.getBroadcastAddress();
+
+ return StorageService.instance.getNaturalEndpoints(keyspaceName, token).contains(local)
+ || StorageService.instance.getTokenMetadata().pendingEndpointsFor(token, keyspaceName).contains(local);
+ }
+
/**
* Use this method to have these Mutations applied
* across all replicas.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 259bd10..4cdeeb0 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3301,6 +3301,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
/**
+ * Returns the endpoints currently responsible for storing the token plus pending ones
+ */
+ public Iterable<InetAddress> getNaturalAndPendingEndpoints(String keyspaceName, Token token)
+ {
+ return Iterables.concat(getNaturalEndpoints(keyspaceName, token), tokenMetadata.pendingEndpointsFor(token, keyspaceName));
+ }
+
+ /**
* This method attempts to return N endpoints that are responsible for storing the
* specified key i.e for replication.
*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/test/unit/org/apache/cassandra/hints/HintTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintTest.java b/test/unit/org/apache/cassandra/hints/HintTest.java
index 4c7ec70..1d486e1 100644
--- a/test/unit/org/apache/cassandra/hints/HintTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintTest.java
@@ -18,6 +18,11 @@
package org.apache.cassandra.hints;
import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.UUID;
+
+import com.google.common.collect.ImmutableList;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -26,17 +31,24 @@ import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.FilteredPartition;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.BootStrapper;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.TableParams;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import static junit.framework.Assert.*;
@@ -67,6 +79,12 @@ public class HintTest
@Before
public void resetGcGraceSeconds()
{
+ TokenMetadata tokenMeta = StorageService.instance.getTokenMetadata();
+ InetAddress local = FBUtilities.getBroadcastAddress();
+ tokenMeta.clearUnsafe();
+ tokenMeta.updateHostId(UUID.randomUUID(), local);
+ tokenMeta.updateNormalTokens(BootStrapper.getRandomTokens(tokenMeta, 1), local);
+
for (CFMetaData table : Schema.instance.getTablesAndViews(KEYSPACE))
table.gcGraceSeconds(TableParams.DEFAULT_GC_GRACE_SECONDS);
}
@@ -185,6 +203,97 @@ public class HintTest
assertNoPartitions(key, TABLE2);
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testChangedTopology() throws Exception
+ {
+ // create a hint
+ long now = FBUtilities.timestampMicros();
+ String key = "testChangedTopology";
+ Mutation mutation = createMutation(key, now);
+ Hint hint = Hint.create(mutation, now / 1000);
+
+ // Prepare metadata with injected stale endpoint serving the mutation key.
+ TokenMetadata tokenMeta = StorageService.instance.getTokenMetadata();
+ InetAddress local = FBUtilities.getBroadcastAddress();
+ InetAddress endpoint = InetAddress.getByName("1.1.1.1");
+ UUID localId = StorageService.instance.getLocalHostUUID();
+ UUID targetId = UUID.randomUUID();
+ tokenMeta.updateHostId(targetId, endpoint);
+ tokenMeta.updateNormalTokens(ImmutableList.of(mutation.key().getToken()), endpoint);
+
+ // sanity check that there is no data inside yet
+ assertNoPartitions(key, TABLE0);
+ assertNoPartitions(key, TABLE1);
+ assertNoPartitions(key, TABLE2);
+
+ assert StorageProxy.instance.getHintsInProgress() == 0;
+ long totalHintCount = StorageProxy.instance.getTotalHints();
+ // Process hint message.
+ HintMessage message = new HintMessage(localId, hint);
+ MessagingService.instance().getVerbHandler(MessagingService.Verb.HINT).doVerb(
+ MessageIn.create(local, message, Collections.emptyMap(), MessagingService.Verb.HINT, MessagingService.current_version),
+ -1);
+
+ // hint should not be applied as we no longer are a replica
+ assertNoPartitions(key, TABLE0);
+ assertNoPartitions(key, TABLE1);
+ assertNoPartitions(key, TABLE2);
+
+ // Attempt to send to new endpoint should have been made. Node is not live hence it should now be a hint.
+ assertEquals(totalHintCount + 1, StorageProxy.instance.getTotalHints());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testChangedTopologyNotHintable() throws Exception
+ {
+ // create a hint
+ long now = FBUtilities.timestampMicros();
+ String key = "testChangedTopology";
+ Mutation mutation = createMutation(key, now);
+ Hint hint = Hint.create(mutation, now / 1000);
+
+ // Prepare metadata with injected stale endpoint.
+ TokenMetadata tokenMeta = StorageService.instance.getTokenMetadata();
+ InetAddress local = FBUtilities.getBroadcastAddress();
+ InetAddress endpoint = InetAddress.getByName("1.1.1.1");
+ UUID localId = StorageService.instance.getLocalHostUUID();
+ UUID targetId = UUID.randomUUID();
+ tokenMeta.updateHostId(targetId, endpoint);
+ tokenMeta.updateNormalTokens(ImmutableList.of(mutation.key().getToken()), endpoint);
+
+ // sanity check that there is no data inside yet
+ assertNoPartitions(key, TABLE0);
+ assertNoPartitions(key, TABLE1);
+ assertNoPartitions(key, TABLE2);
+
+ try
+ {
+ DatabaseDescriptor.setHintedHandoffEnabled(false);
+
+ assert StorageMetrics.totalHintsInProgress.getCount() == 0;
+ long totalHintCount = StorageMetrics.totalHints.getCount();
+ // Process hint message.
+ HintMessage message = new HintMessage(localId, hint);
+ MessagingService.instance().getVerbHandler(MessagingService.Verb.HINT).doVerb(
+ MessageIn.create(local, message, Collections.emptyMap(), MessagingService.Verb.HINT, MessagingService.current_version),
+ -1);
+
+ // hint should not be applied as we no longer are a replica
+ assertNoPartitions(key, TABLE0);
+ assertNoPartitions(key, TABLE1);
+ assertNoPartitions(key, TABLE2);
+
+ // Attempt to send to new endpoint should not have been made.
+ assertEquals(totalHintCount, StorageMetrics.totalHints.getCount());
+ }
+ finally
+ {
+ DatabaseDescriptor.setHintedHandoffEnabled(true);
+ }
+ }
+
private static Mutation createMutation(String key, long now)
{
Mutation mutation = new Mutation(KEYSPACE, dk(key));