You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2016/02/11 14:12:35 UTC

[2/3] cassandra git commit: Properly handle hinted handoff after topology changes

Properly handle hinted handoff after topology changes

patch by Branimir Lambov; reviewed by Aleksey Yeschenko for
CASSANDRA-5902


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b2f38ef1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b2f38ef1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b2f38ef1

Branch: refs/heads/trunk
Commit: b2f38ef177b5c19288f96d9bfe5304ec94391f73
Parents: 2a824c0
Author: Branimir Lambov <br...@datastax.com>
Authored: Wed Nov 18 16:26:37 2015 +0200
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Feb 11 13:06:16 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/batchlog/BatchlogManager.java     |   3 +-
 .../apache/cassandra/hints/HintVerbHandler.java |  22 +++-
 .../cassandra/hints/HintsDispatchExecutor.java  |  28 ++++-
 .../apache/cassandra/hints/HintsDispatcher.java |   7 +-
 .../apache/cassandra/hints/HintsService.java    |  20 +++-
 .../apache/cassandra/service/StorageProxy.java  |  20 +++-
 .../cassandra/service/StorageService.java       |   8 ++
 .../org/apache/cassandra/hints/HintTest.java    | 109 +++++++++++++++++++
 9 files changed, 199 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 828ba21..e723aba 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.4
+ * Properly handle hinted handoff after topology changes (CASSANDRA-5902)
  * AssertionError when listing sstable files on inconsistent disk state (CASSANDRA-11156)
  * Fix wrong rack counting and invalid conditions check for TokenAllocation
    (CASSANDRA-11139)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index 7ccc6f8..f5133bb 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -420,8 +420,7 @@ public class BatchlogManager implements BatchlogManagerMBean
             String ks = mutation.getKeyspaceName();
             Token tk = mutation.key().getToken();
 
-            for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
-                                                         StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
+            for (InetAddress endpoint : StorageService.instance.getNaturalAndPendingEndpoints(ks, tk))
             {
                 if (endpoint.equals(FBUtilities.getBroadcastAddress()))
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/src/java/org/apache/cassandra/hints/HintVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintVerbHandler.java b/src/java/org/apache/cassandra/hints/HintVerbHandler.java
index b2c7b6a..d8838a9 100644
--- a/src/java/org/apache/cassandra/hints/HintVerbHandler.java
+++ b/src/java/org/apache/cassandra/hints/HintVerbHandler.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 
 /**
@@ -72,12 +73,23 @@ public final class HintVerbHandler implements IVerbHandler<HintMessage>
             return;
         }
 
-        // Apply the hint if this node is the destination, store for future dispatch if this node isn't (must have gotten
-        // it from a decommissioned node that had streamed it before going out).
-        if (hostId.equals(StorageService.instance.getLocalHostUUID()))
-            hint.apply();
-        else
+        if (!hostId.equals(StorageService.instance.getLocalHostUUID()))
+        {
+            // the node is not the final destination of the hint (must have gotten it from a decommissioning node),
+            // so just store it locally, to be delivered later.
             HintsService.instance.write(hostId, hint);
+        }
+        else if (!StorageProxy.instance.appliesLocally(hint.mutation))
+        {
+            // the topology has changed, and we are no longer a replica of the mutation - since we don't know which node(s)
+            // it has been handed over to, re-address the hint to all replicas; see CASSANDRA-5902.
+            HintsService.instance.writeForAllReplicas(hint);
+        }
+        else
+        {
+            // the common path - the node is both the destination and a valid replica for the hint.
+            hint.apply();
+        }
 
         reply(id, message.from);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
index 7782d5d..5292dc1 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.hints;
 
 import java.io.File;
+import java.net.InetAddress;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.*;
@@ -236,10 +237,21 @@ final class HintsDispatchExecutor
         {
             logger.trace("Dispatching hints file {}", descriptor.fileName());
 
+            InetAddress address = StorageService.instance.getEndpointForHostId(hostId);
+            if (address != null)
+                return deliver(descriptor, address);
+
+            // address == null means the target no longer exist; find new home for each hint entry.
+            convert(descriptor);
+            return true;
+        }
+
+        private boolean deliver(HintsDescriptor descriptor, InetAddress address)
+        {
             File file = new File(hintsDirectory, descriptor.fileName());
             Long offset = store.getDispatchOffset(descriptor).orElse(null);
 
-            try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, hostId, descriptor.hostId, isPaused))
+            try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, address, descriptor.hostId, isPaused))
             {
                 if (offset != null)
                     dispatcher.seek(offset);
@@ -260,5 +272,19 @@ final class HintsDispatchExecutor
                 }
             }
         }
+
+        // for each hint in the hints file for a node that isn't part of the ring anymore, write RF hints for each replica
+        private void convert(HintsDescriptor descriptor)
+        {
+            File file = new File(hintsDirectory, descriptor.fileName());
+
+            try (HintsReader reader = HintsReader.open(file, rateLimiter))
+            {
+                reader.forEach(page -> page.hintsIterator().forEachRemaining(HintsService.instance::writeForAllReplicas));
+                store.delete(descriptor);
+                store.cleanUp(descriptor);
+                logger.info("Finished converting hints file {}", descriptor.fileName());
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/src/java/org/apache/cassandra/hints/HintsDispatcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
index 94a6669..e582d88 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java
@@ -32,7 +32,6 @@ import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 /**
@@ -64,11 +63,10 @@ final class HintsDispatcher implements AutoCloseable
         this.isPaused = isPaused;
     }
 
-    static HintsDispatcher create(File file, RateLimiter rateLimiter, UUID hostId, UUID hintFor, AtomicBoolean isPaused)
+    static HintsDispatcher create(File file, RateLimiter rateLimiter, InetAddress address, UUID hostId, AtomicBoolean isPaused)
     {
-        InetAddress address = StorageService.instance.getEndpointForHostId(hostId);
         int messagingVersion = MessagingService.instance().getVersion(address);
-        return new HintsDispatcher(HintsReader.open(file, rateLimiter), hintFor, address, messagingVersion, isPaused);
+        return new HintsDispatcher(HintsReader.open(file, rateLimiter), hostId, address, messagingVersion, isPaused);
     }
 
     public void close()
@@ -152,6 +150,7 @@ final class HintsDispatcher implements AutoCloseable
     /*
      * Sending hints in compatibility mode.
      */
+
     private <T> Action sendHints(Iterator<T> hints, Collection<Callback> callbacks, Function<T, Callback> sendFunction)
     {
         while (hints.hasNext())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
index 5001af4..5a32786 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -22,7 +22,6 @@ import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Collections;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -40,9 +39,11 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.metrics.HintedHandoffMetrics;
 import org.apache.cassandra.metrics.StorageMetrics;
-import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 
+import static com.google.common.collect.Iterables.filter;
 import static com.google.common.collect.Iterables.transform;
 import static com.google.common.collect.Iterables.size;
 
@@ -167,6 +168,21 @@ public final class HintsService implements HintsServiceMBean
     }
 
     /**
+     * Write a hint for all replicas. Used to re-dispatch hints whose destination is either missing or no longer correct.
+     */
+    void writeForAllReplicas(Hint hint)
+    {
+        String keyspaceName = hint.mutation.getKeyspaceName();
+        Token token = hint.mutation.key().getToken();
+
+        Iterable<UUID> hostIds =
+        transform(filter(StorageService.instance.getNaturalAndPendingEndpoints(keyspaceName, token), StorageProxy::shouldHint),
+                  StorageService.instance::getHostIdForEndpoint);
+
+        write(hostIds, hint);
+    }
+
+    /**
      * Flush the buffer pool for the selected target nodes, then fsync their writers.
      *
      * @param hostIds host ids of the nodes to flush and fsync hints for

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 8fa2082..5cebf27 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -71,6 +71,8 @@ import org.apache.cassandra.triggers.TriggerExecutor;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.AbstractIterator;
 
+import static com.google.common.collect.Iterables.contains;
+
 public class StorageProxy implements StorageProxyMBean
 {
     public static final String MBEAN_NAME = "org.apache.cassandra.db:type=StorageProxy";
@@ -670,12 +672,10 @@ public class StorageProxy implements StorageProxyMBean
 
     private static void hintMutation(Mutation mutation)
     {
-        Token tk = DatabaseDescriptor.getPartitioner().getToken(mutation.key().getKey());
-        List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getKeyspaceName(), tk);
-        Collection<InetAddress> pendingEndpoints =
-            StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getKeyspaceName());
+        String keyspaceName = mutation.getKeyspaceName();
+        Token token = mutation.key().getToken();
 
-        Iterable<InetAddress> endpoints = Iterables.concat(naturalEndpoints, pendingEndpoints);
+        Iterable<InetAddress> endpoints = StorageService.instance.getNaturalAndPendingEndpoints(keyspaceName, token);
         ArrayList<InetAddress> endpointsToHint = new ArrayList<>(Iterables.size(endpoints));
 
         // local writes can timeout, but cannot be dropped (see LocalMutationRunnable and CASSANDRA-6510),
@@ -687,6 +687,16 @@ public class StorageProxy implements StorageProxyMBean
         submitHint(mutation, endpointsToHint, null);
     }
 
+    public boolean appliesLocally(Mutation mutation)
+    {
+        String keyspaceName = mutation.getKeyspaceName();
+        Token token = mutation.key().getToken();
+        InetAddress local = FBUtilities.getBroadcastAddress();
+
+        return StorageService.instance.getNaturalEndpoints(keyspaceName, token).contains(local)
+               || StorageService.instance.getTokenMetadata().pendingEndpointsFor(token, keyspaceName).contains(local);
+    }
+
     /**
      * Use this method to have these Mutations applied
      * across all replicas.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 259bd10..4cdeeb0 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3301,6 +3301,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     }
 
     /**
+     * Returns the endpoints currently responsible for storing the token plus pending ones
+     */
+    public Iterable<InetAddress> getNaturalAndPendingEndpoints(String keyspaceName, Token token)
+    {
+        return Iterables.concat(getNaturalEndpoints(keyspaceName, token), tokenMetadata.pendingEndpointsFor(token, keyspaceName));
+    }
+
+    /**
      * This method attempts to return N endpoints that are responsible for storing the
      * specified key i.e for replication.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f38ef1/test/unit/org/apache/cassandra/hints/HintTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintTest.java b/test/unit/org/apache/cassandra/hints/HintTest.java
index 4c7ec70..1d486e1 100644
--- a/test/unit/org/apache/cassandra/hints/HintTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintTest.java
@@ -18,6 +18,11 @@
 package org.apache.cassandra.hints;
 
 import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.UUID;
+
+import com.google.common.collect.ImmutableList;
 
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -26,17 +31,24 @@ import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.FilteredPartition;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.BootStrapper;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.TableParams;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static junit.framework.Assert.*;
@@ -67,6 +79,12 @@ public class HintTest
     @Before
     public void resetGcGraceSeconds()
     {
+        TokenMetadata tokenMeta = StorageService.instance.getTokenMetadata();
+        InetAddress local = FBUtilities.getBroadcastAddress();
+        tokenMeta.clearUnsafe();
+        tokenMeta.updateHostId(UUID.randomUUID(), local);
+        tokenMeta.updateNormalTokens(BootStrapper.getRandomTokens(tokenMeta, 1), local);
+
         for (CFMetaData table : Schema.instance.getTablesAndViews(KEYSPACE))
             table.gcGraceSeconds(TableParams.DEFAULT_GC_GRACE_SECONDS);
     }
@@ -185,6 +203,97 @@ public class HintTest
         assertNoPartitions(key, TABLE2);
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testChangedTopology() throws Exception
+    {
+        // create a hint
+        long now = FBUtilities.timestampMicros();
+        String key = "testChangedTopology";
+        Mutation mutation = createMutation(key, now);
+        Hint hint = Hint.create(mutation, now / 1000);
+
+        // Prepare metadata with injected stale endpoint serving the mutation key.
+        TokenMetadata tokenMeta = StorageService.instance.getTokenMetadata();
+        InetAddress local = FBUtilities.getBroadcastAddress();
+        InetAddress endpoint = InetAddress.getByName("1.1.1.1");
+        UUID localId = StorageService.instance.getLocalHostUUID();
+        UUID targetId = UUID.randomUUID();
+        tokenMeta.updateHostId(targetId, endpoint);
+        tokenMeta.updateNormalTokens(ImmutableList.of(mutation.key().getToken()), endpoint);
+
+        // sanity check that there is no data inside yet
+        assertNoPartitions(key, TABLE0);
+        assertNoPartitions(key, TABLE1);
+        assertNoPartitions(key, TABLE2);
+
+        assert StorageProxy.instance.getHintsInProgress() == 0;
+        long totalHintCount = StorageProxy.instance.getTotalHints();
+        // Process hint message.
+        HintMessage message = new HintMessage(localId, hint);
+        MessagingService.instance().getVerbHandler(MessagingService.Verb.HINT).doVerb(
+                MessageIn.create(local, message, Collections.emptyMap(), MessagingService.Verb.HINT, MessagingService.current_version),
+                -1);
+
+        // hint should not be applied as we no longer are a replica
+        assertNoPartitions(key, TABLE0);
+        assertNoPartitions(key, TABLE1);
+        assertNoPartitions(key, TABLE2);
+
+        // Attempt to send to new endpoint should have been made. Node is not live hence it should now be a hint.
+        assertEquals(totalHintCount + 1, StorageProxy.instance.getTotalHints());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testChangedTopologyNotHintable() throws Exception
+    {
+        // create a hint
+        long now = FBUtilities.timestampMicros();
+        String key = "testChangedTopology";
+        Mutation mutation = createMutation(key, now);
+        Hint hint = Hint.create(mutation, now / 1000);
+
+        // Prepare metadata with injected stale endpoint.
+        TokenMetadata tokenMeta = StorageService.instance.getTokenMetadata();
+        InetAddress local = FBUtilities.getBroadcastAddress();
+        InetAddress endpoint = InetAddress.getByName("1.1.1.1");
+        UUID localId = StorageService.instance.getLocalHostUUID();
+        UUID targetId = UUID.randomUUID();
+        tokenMeta.updateHostId(targetId, endpoint);
+        tokenMeta.updateNormalTokens(ImmutableList.of(mutation.key().getToken()), endpoint);
+
+        // sanity check that there is no data inside yet
+        assertNoPartitions(key, TABLE0);
+        assertNoPartitions(key, TABLE1);
+        assertNoPartitions(key, TABLE2);
+
+        try
+        {
+            DatabaseDescriptor.setHintedHandoffEnabled(false);
+
+            assert StorageMetrics.totalHintsInProgress.getCount() == 0;
+            long totalHintCount = StorageMetrics.totalHints.getCount();
+            // Process hint message.
+            HintMessage message = new HintMessage(localId, hint);
+            MessagingService.instance().getVerbHandler(MessagingService.Verb.HINT).doVerb(
+                    MessageIn.create(local, message, Collections.emptyMap(), MessagingService.Verb.HINT, MessagingService.current_version),
+                    -1);
+
+            // hint should not be applied as we no longer are a replica
+            assertNoPartitions(key, TABLE0);
+            assertNoPartitions(key, TABLE1);
+            assertNoPartitions(key, TABLE2);
+
+            // Attempt to send to new endpoint should not have been made.
+            assertEquals(totalHintCount, StorageMetrics.totalHints.getCount());
+        }
+        finally
+        {
+            DatabaseDescriptor.setHintedHandoffEnabled(true);
+        }
+    }
+
     private static Mutation createMutation(String key, long now)
     {
         Mutation mutation = new Mutation(KEYSPACE, dk(key));