You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/12/04 21:54:28 UTC

[1/3] cassandra git commit: Avoid MV race during node decommission

Repository: cassandra
Updated Branches:
  refs/heads/trunk 646da9aa8 -> 9fb436f93


Avoid MV race during node decommission

Patch by Paulo Motta; reviewed by Joel Knighton for CASSANDRA-10674


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c184e8c1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c184e8c1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c184e8c1

Branch: refs/heads/trunk
Commit: c184e8c14b28eddc20cbdd098f5e47d1ed832898
Parents: a4da379
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Nov 25 14:50:31 2015 -0800
Committer: T Jake Luciani <ja...@apache.org>
Committed: Fri Dec 4 15:50:48 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/view/ViewUtils.java | 25 ++------
 .../apache/cassandra/service/StorageProxy.java  | 67 ++++++++++++--------
 .../apache/cassandra/db/view/ViewUtilsTest.java | 42 ++++++++++--
 4 files changed, 88 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c184e8c1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8f3f182..b95aa76 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.1
+ * Avoid MV race during node decommission (CASSANDRA-10674)
  * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
  * Handle single-column deletions correction in materialized views
    when the column is part of the view primary key (CASSANDRA-10796)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c184e8c1/src/java/org/apache/cassandra/db/view/ViewUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewUtils.java b/src/java/org/apache/cassandra/db/view/ViewUtils.java
index 089a3b7..4d9517f 100644
--- a/src/java/org/apache/cassandra/db/view/ViewUtils.java
+++ b/src/java/org/apache/cassandra/db/view/ViewUtils.java
@@ -21,13 +21,13 @@ package org.apache.cassandra.db.view;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
 public final class ViewUtils
@@ -56,9 +56,9 @@ public final class ViewUtils
      *  B writes to A (B's cardinality is 2 for T1, and A's cardinality is 2 for T3)
      *  C writes to B (C's cardinality is 3 for T1, and B's cardinality is 3 for T3)
      *
-     * @throws RuntimeException if this method is called using a base token which does not belong to this replica
+     * @return Optional.empty() if this method is called using a base token which does not belong to this replica
      */
-    public static InetAddress getViewNaturalEndpoint(String keyspaceName, Token baseToken, Token viewToken)
+    public static Optional<InetAddress> getViewNaturalEndpoint(String keyspaceName, Token baseToken, Token viewToken)
     {
         AbstractReplicationStrategy replicationStrategy = Keyspace.open(keyspaceName).getReplicationStrategy();
 
@@ -77,7 +77,7 @@ public final class ViewUtils
         {
             // If we are a base endpoint which is also a view replica, we use ourselves as our view replica
             if (viewEndpoint.equals(FBUtilities.getBroadcastAddress()))
-                return viewEndpoint;
+                return Optional.of(viewEndpoint);
 
             // We have to remove any endpoint which is shared between the base and the view, as it will select itself
             // and throw off the counts otherwise.
@@ -95,20 +95,9 @@ public final class ViewUtils
         int baseIdx = baseEndpoints.indexOf(FBUtilities.getBroadcastAddress());
 
         if (baseIdx < 0)
-        {
-
-            if (StorageService.instance.getTokenMetadata().pendingEndpointsFor(viewToken, keyspaceName).size() > 0)
-            {
-                //Since there are pending endpoints we are going to write to the batchlog regardless.
-                //So we can pretend we are the views endpoint.
-
-                return FBUtilities.getBroadcastAddress();
-            }
-
-            throw new RuntimeException("Trying to get the view natural endpoint on a non-data replica");
-        }
-
+            //This node is not a base replica of this key, so we return empty
+            return Optional.empty();
 
-        return viewEndpoints.get(baseIdx);
+        return Optional.of(viewEndpoints.get(baseIdx));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c184e8c1/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 15be7c6..397b8b9 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -670,12 +670,12 @@ public class StorageProxy implements StorageProxyMBean
             if (StorageService.instance.isStarting() || StorageService.instance.isJoining() || StorageService.instance.isMoving())
             {
                 BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(),
-                                                        mutations),
-                                      writeCommitLog);
+                                                        mutations), writeCommitLog);
             }
             else
             {
                 List<WriteResponseHandlerWrapper> wrappers = new ArrayList<>(mutations.size());
+                List<Mutation> nonPairedMutations = new LinkedList<>();
                 Token baseToken = StorageService.instance.getTokenMetadata().partitioner.getToken(dataKey);
 
                 ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
@@ -684,40 +684,51 @@ public class StorageProxy implements StorageProxyMBean
                 final Collection<InetAddress> batchlogEndpoints = Collections.singleton(FBUtilities.getBroadcastAddress());
                 BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
                                                                                                               () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID));
-
                 // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet
                 for (Mutation mutation : mutations)
                 {
                     String keyspaceName = mutation.getKeyspaceName();
                     Token tk = mutation.key().getToken();
-                    InetAddress pairedEndpoint = ViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk);
-                    List<InetAddress> naturalEndpoints = Lists.newArrayList(pairedEndpoint);
-
-                    WriteResponseHandlerWrapper wrapper = wrapViewBatchResponseHandler(mutation,
-                                                                                       consistencyLevel,
-                                                                                       consistencyLevel,
-                                                                                       naturalEndpoints,
-                                                                                       baseComplete,
-                                                                                       WriteType.BATCH,
-                                                                                       cleanup);
-
-                    // When local node is the endpoint and there are no pending nodes we can
-                    // Just apply the mutation locally.
-                    if (pairedEndpoint.equals(FBUtilities.getBroadcastAddress()) && wrapper.handler.pendingEndpoints.isEmpty() && StorageService.instance.isJoined())
+                    Optional<InetAddress> pairedEndpoint = ViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk);
+                    Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
+
+                    if (pairedEndpoint.isPresent())
                     {
-                        try
-                        {
-                            mutation.apply(writeCommitLog);
-                        }
-                        catch (Exception exc)
+                        // When local node is the endpoint and there are no pending nodes we can
+                        // Just apply the mutation locally.
+                        if (pairedEndpoint.get().equals(FBUtilities.getBroadcastAddress())
+                            && pendingEndpoints.isEmpty() && StorageService.instance.isJoined())
+                            try
+                            {
+                                mutation.apply(writeCommitLog);
+                            }
+                            catch (Exception exc)
+                            {
+                                logger.error("Error applying local view update to keyspace {}: {}", mutation.getKeyspaceName(), mutation);
+                                throw exc;
+                            }
+                        else
                         {
-                            logger.error("Error applying local view update to keyspace {}: {}", mutation.getKeyspaceName(), mutation);
-                            throw exc;
+                            wrappers.add(wrapViewBatchResponseHandler(mutation,
+                                                                      consistencyLevel,
+                                                                      consistencyLevel,
+                                                                      Collections.singletonList(pairedEndpoint.get()),
+                                                                      baseComplete,
+                                                                      WriteType.BATCH,
+                                                                      cleanup));
                         }
                     }
                     else
                     {
-                        wrappers.add(wrapper);
+                        //if there are no paired endpoints there are probably range movements going on,
+                        //so we write to the local batchlog to replay later
+                        if (pendingEndpoints.isEmpty())
+                            logger.warn("Received base materialized view mutation for key %s that does not belong " +
+                                        "to this node. There is probably a range movement happening (move or decommission)," +
+                                        "but this node hasn't updated its ring metadata yet. Adding mutation to " +
+                                        "local batchlog to be replayed later.",
+                                        mutation.key());
+                        nonPairedMutations.add(mutation);
                     }
                 }
 
@@ -730,6 +741,12 @@ public class StorageProxy implements StorageProxyMBean
                     // now actually perform the writes and wait for them to complete
                     asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.VIEW_MUTATION);
                 }
+
+                if (!nonPairedMutations.isEmpty())
+                {
+                    BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(), nonPairedMutations),
+                                          writeCommitLog);
+                }
             }
         }
         finally

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c184e8c1/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java b/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
index 8fd0cfb..c238f36 100644
--- a/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
+++ b/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.db.view;
 import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -74,11 +75,12 @@ public class ViewUtilsTest
         KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap));
         Schema.instance.setKeyspaceMetadata(meta);
 
-        InetAddress naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
+        Optional<InetAddress> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
                                                                        new StringToken("CA"),
                                                                        new StringToken("BB"));
 
-        Assert.assertEquals(InetAddress.getByName("127.0.0.2"), naturalEndpoint);
+        Assert.assertTrue(naturalEndpoint.isPresent());
+        Assert.assertEquals(InetAddress.getByName("127.0.0.2"), naturalEndpoint.get());
     }
 
 
@@ -106,10 +108,42 @@ public class ViewUtilsTest
         KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap));
         Schema.instance.setKeyspaceMetadata(meta);
 
-        InetAddress naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
+        Optional<InetAddress> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
                                                                        new StringToken("CA"),
                                                                        new StringToken("BB"));
 
-        Assert.assertEquals(InetAddress.getByName("127.0.0.1"), naturalEndpoint);
+        Assert.assertTrue(naturalEndpoint.isPresent());
+        Assert.assertEquals(InetAddress.getByName("127.0.0.1"), naturalEndpoint.get());
+    }
+
+    @Test
+    public void testBaseTokenDoesNotBelongToLocalReplicaShouldReturnEmpty() throws Exception
+    {
+        TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+        metadata.clearUnsafe();
+
+        // DC1
+        metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
+        metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2"));
+
+        // DC2
+        metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
+        metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
+
+        Map<String, String> replicationMap = new HashMap<>();
+        replicationMap.put(ReplicationParams.CLASS, NetworkTopologyStrategy.class.getName());
+
+        replicationMap.put("DC1", "1");
+        replicationMap.put("DC2", "1");
+
+        Keyspace.clear("Keyspace1");
+        KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap));
+        Schema.instance.setKeyspaceMetadata(meta);
+
+        Optional<InetAddress> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
+                                                                       new StringToken("AB"),
+                                                                       new StringToken("BB"));
+
+        Assert.assertFalse(naturalEndpoint.isPresent());
     }
 }


[3/3] cassandra git commit: Merge branch 'cassandra-3.1' into trunk

Posted by ja...@apache.org.
Merge branch 'cassandra-3.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9fb436f9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9fb436f9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9fb436f9

Branch: refs/heads/trunk
Commit: 9fb436f936d9dc9740273b147c9c4a960638affc
Parents: 646da9a ee46dfb
Author: T Jake Luciani <ja...@apache.org>
Authored: Fri Dec 4 15:54:15 2015 -0500
Committer: T Jake Luciani <ja...@apache.org>
Committed: Fri Dec 4 15:54:15 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/view/ViewUtils.java | 25 ++------
 .../apache/cassandra/service/StorageProxy.java  | 67 ++++++++++++--------
 .../apache/cassandra/db/view/ViewUtilsTest.java | 42 ++++++++++--
 4 files changed, 88 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fb436f9/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 9a0e5ce,a2e9434..0511ed4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,20 -1,6 +1,21 @@@
 +3.2
 + * bound maximum in-flight commit log replay mutation bytes to 64 megabytes (CASSANDRA-8639)
 + * Normalize all scripts (CASSANDRA-10679)
 + * Make compression ratio much more accurate (CASSANDRA-10225)
 + * Optimize building of Clustering object when only one is created (CASSANDRA-10409)
 + * Make index building pluggable (CASSANDRA-10681)
 + * Add sstable flush observer (CASSANDRA-10678)
 + * Improve NTS endpoints calculation (CASSANDRA-10200)
 + * Improve performance of the folderSize function (CASSANDRA-10677)
 + * Add support for type casting in selection clause (CASSANDRA-10310)
 + * Added graphing option to cassandra-stress (CASSANDRA-7918)
 + * Abort in-progress queries that time out (CASSANDRA-7392)
 + * Add transparent data encryption core classes (CASSANDRA-9945)
 +
 +
  3.1
  Merged from 3.0:
+  * Avoid MV race during node decommission (CASSANDRA-10674)
   * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
   * Handle single-column deletions correction in materialized views
     when the column is part of the view primary key (CASSANDRA-10796)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fb436f9/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------


[2/3] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.1

Posted by ja...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.1

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ee46dfbf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ee46dfbf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ee46dfbf

Branch: refs/heads/trunk
Commit: ee46dfbf934fc91d8d4865bef174b68174790edd
Parents: 2b8cdc2 c184e8c
Author: T Jake Luciani <ja...@apache.org>
Authored: Fri Dec 4 15:53:44 2015 -0500
Committer: T Jake Luciani <ja...@apache.org>
Committed: Fri Dec 4 15:53:44 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/view/ViewUtils.java | 25 ++------
 .../apache/cassandra/service/StorageProxy.java  | 67 ++++++++++++--------
 .../apache/cassandra/db/view/ViewUtilsTest.java | 42 ++++++++++--
 4 files changed, 88 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee46dfbf/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index e06653f,b95aa76..a2e9434
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,5 -1,5 +1,6 @@@
 -3.0.1
 +3.1
 +Merged from 3.0:
+  * Avoid MV race during node decommission (CASSANDRA-10674)
   * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
   * Handle single-column deletions correction in materialized views
     when the column is part of the view primary key (CASSANDRA-10796)