You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pa...@apache.org on 2016/12/15 19:15:27 UTC

[1/2] cassandra git commit: Retry acquire MV lock on failure instead of throwing WTE on streaming

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 9fc1ffb63 -> 48abc0369


Retry acquire MV lock on failure instead of throwing WTE on streaming

Patch by Benjamin Roth; Reviewed by Paulo Motta for CASSANDRA-12905


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3faa0d92
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3faa0d92
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3faa0d92

Branch: refs/heads/cassandra-3.0
Commit: 3faa0d925791be085b92949a0a0ec20f7e6ae368
Parents: 9fc1ffb
Author: brstgt <br...@googlemail.com>
Authored: Thu Dec 15 12:42:31 2016 -0200
Committer: Paulo Motta <pa...@apache.org>
Committed: Thu Dec 15 16:46:00 2016 -0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/db/Keyspace.java  | 92 +++++++++++---------
 src/java/org/apache/cassandra/db/Mutation.java  | 17 ++--
 .../db/commitlog/CommitLogReplayer.java         | 10 +--
 .../cassandra/service/paxos/PaxosState.java     |  9 +-
 .../cassandra/streaming/StreamReceiveTask.java  |  5 +-
 6 files changed, 63 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e69bf08..63e095d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.11
+ * Improve contention handling on failure to acquire MV lock for streaming and hints (CASSANDRA-12905)
  * Fix DELETE and UPDATE queries with empty IN restrictions (CASSANDRA-12829)
  * Mark MVs as built after successful bootstrap (CASSANDRA-12984)
  * Estimated TS drop-time histogram updated with Cell.NO_DELETION_TIME (CASSANDRA-13040)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index ec5102b..3715995 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -26,6 +26,8 @@ import java.util.concurrent.locks.Lock;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
@@ -50,8 +52,6 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * It represents a Keyspace.
@@ -62,7 +62,7 @@ public class Keyspace
 
     private static final String TEST_FAIL_WRITES_KS = System.getProperty("cassandra.test.fail_writes_ks", "");
     private static final boolean TEST_FAIL_WRITES = !TEST_FAIL_WRITES_KS.isEmpty();
-    private static int TEST_FAIL_MV_LOCKS_COUNT = Integer.getInteger(System.getProperty("cassandra.test.fail_mv_locks_count", "0"), 0);
+    private static int TEST_FAIL_MV_LOCKS_COUNT = Integer.getInteger("cassandra.test.fail_mv_locks_count", 0);
 
     public final KeyspaceMetrics metric;
 
@@ -379,42 +379,40 @@ public class Keyspace
         }
     }
 
-    public CompletableFuture<?> apply(Mutation mutation, boolean writeCommitLog)
-    {
-        return apply(mutation, writeCommitLog, true, false, null);
-    }
-
-    /**
-     * Should be used if caller is blocking and runs in mutation stage.
-     * Otherwise there is a race condition where ALL mutation workers are beeing blocked ending
-     * in a complete deadlock of the mutation stage. See CASSANDRA-12689.
-     *
-     * @param mutation
-     * @param writeCommitLog
-     * @return
-     */
-    public CompletableFuture<?> applyNotDeferrable(Mutation mutation, boolean writeCommitLog)
+    public CompletableFuture<?> applyFuture(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
     {
-        return apply(mutation, writeCommitLog, true, false, false, null);
+        return apply(mutation, writeCommitLog, updateIndexes, true, true, null);
     }
 
-    public CompletableFuture<?> apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
+    public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
     {
-        return apply(mutation, writeCommitLog, updateIndexes, false, null);
+        apply(mutation, writeCommitLog, updateIndexes, true);
     }
 
-    public CompletableFuture<?> applyFromCommitLog(Mutation mutation)
+    public void apply(final Mutation mutation,
+                      final boolean writeCommitLog)
     {
-        return apply(mutation, false, true, true, null);
+        apply(mutation, writeCommitLog, true, true);
     }
 
-    public CompletableFuture<?> apply(final Mutation mutation,
-                                      final boolean writeCommitLog,
-                                      boolean updateIndexes,
-                                      boolean isClReplay,
-                                      CompletableFuture<?> future)
+    /**
+     * If apply is blocking, apply must not be deferred
+     * Otherwise there is a race condition where ALL mutation workers are beeing blocked ending
+     * in a complete deadlock of the mutation stage. See CASSANDRA-12689.
+     *
+     * @param mutation       the row to write.  Must not be modified after calling apply, since commitlog append
+     *                       may happen concurrently, depending on the CL Executor type.
+     * @param writeCommitLog false to disable commitlog append entirely
+     * @param updateIndexes  false to disable index updates (used by CollationController "defragmenting")
+     * @param isDroppable    true if this should throw WriteTimeoutException if it does not acquire lock within write_request_timeout_in_ms
+     * @throws ExecutionException
+     */
+    public void apply(final Mutation mutation,
+                      final boolean writeCommitLog,
+                      boolean updateIndexes,
+                      boolean isDroppable)
     {
-        return apply(mutation, writeCommitLog, updateIndexes, isClReplay, true, future);
+        apply(mutation, writeCommitLog, updateIndexes, isDroppable, false, null);
     }
 
     /**
@@ -424,13 +422,13 @@ public class Keyspace
      *                       may happen concurrently, depending on the CL Executor type.
      * @param writeCommitLog false to disable commitlog append entirely
      * @param updateIndexes  false to disable index updates (used by CollationController "defragmenting")
-     * @param isClReplay     true if caller is the commitlog replayer
+     * @param isDroppable    true if this should throw WriteTimeoutException if it does not acquire lock within write_request_timeout_in_ms
      * @param isDeferrable   true if caller is not waiting for future to complete, so that future may be deferred
      */
-    public CompletableFuture<?> apply(final Mutation mutation,
+    private CompletableFuture<?> apply(final Mutation mutation,
                                       final boolean writeCommitLog,
                                       boolean updateIndexes,
-                                      boolean isClReplay,
+                                      boolean isDroppable,
                                       boolean isDeferrable,
                                       CompletableFuture<?> future)
     {
@@ -438,7 +436,11 @@ public class Keyspace
             throw new RuntimeException("Testing write failures");
 
         boolean requiresViewUpdate = updateIndexes && viewManager.updatesAffectView(Collections.singleton(mutation), false);
-        final CompletableFuture<?> mark = future == null ? new CompletableFuture<>() : future;
+
+        // If apply is not deferrable, no future is required, returns always null
+        if (isDeferrable && future == null) {
+            future = new CompletableFuture<>();
+        }
 
         Lock lock = null;
         if (requiresViewUpdate)
@@ -453,15 +455,15 @@ public class Keyspace
 
                 if (lock == null)
                 {
-                    // avoid throwing a WTE during commitlog replay
-                    if (!isClReplay && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
+                    //throw WTE only if request is droppable
+                    if (isDroppable && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
                     {
                         logger.trace("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()));
                         Tracing.trace("Could not acquire MV lock");
                         if (future != null)
                         {
                             future.completeExceptionally(new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1));
-                            return mark;
+                            return future;
                         }
                         else
                         {
@@ -472,11 +474,12 @@ public class Keyspace
                     {
                         //This view update can't happen right now. so rather than keep this thread busy
                         // we will re-apply ourself to the queue and try again later
+                        final CompletableFuture<?> mark = future;
                         StageManager.getStage(Stage.MUTATION).execute(() ->
-                                apply(mutation, writeCommitLog, true, isClReplay, mark)
+                                apply(mutation, writeCommitLog, true, isDroppable, true, mark)
                         );
 
-                        return mark;
+                        return future;
                     }
                     else
                     {
@@ -499,7 +502,9 @@ public class Keyspace
                 else
                 {
                     long acquireTime = System.currentTimeMillis() - mutation.viewLockAcquireStart.get();
-                    if (!isClReplay)
+                    // Metrics are only collected for droppable write operations
+                    // Bulk non-droppable operations (e.g. commitlog replay, hint delivery) are not measured
+                    if (isDroppable)
                     {
                         for (UUID cfid : mutation.getColumnFamilyIds())
                             columnFamilyStores.get(cfid).metric.viewLockAcquireTime.update(acquireTime, TimeUnit.MILLISECONDS);
@@ -534,7 +539,7 @@ public class Keyspace
                     try
                     {
                         Tracing.trace("Creating materialized view mutations from base table replica");
-                        viewManager.forTable(upd.metadata()).pushViewReplicaUpdates(upd, writeCommitLog && !isClReplay, baseComplete);
+                        viewManager.forTable(upd.metadata()).pushViewReplicaUpdates(upd, writeCommitLog, baseComplete);
                     }
                     catch (Throwable t)
                     {
@@ -553,8 +558,11 @@ public class Keyspace
                 if (requiresViewUpdate)
                     baseComplete.set(System.currentTimeMillis());
             }
-            mark.complete(null);
-            return mark;
+
+            if (future != null) {
+                future.complete(null);
+            }
+            return future;
         }
         finally
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 2955677..7ed69c0 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -202,20 +202,17 @@ public class Mutation implements IMutation
     public CompletableFuture<?> applyFuture()
     {
         Keyspace ks = Keyspace.open(keyspaceName);
-        return ks.apply(this, Keyspace.open(keyspaceName).getMetadata().params.durableWrites);
+        return ks.applyFuture(this, Keyspace.open(keyspaceName).getMetadata().params.durableWrites, true);
+    }
+
+    public void apply(boolean durableWrites, boolean isDroppable)
+    {
+        Keyspace.open(keyspaceName).apply(this, durableWrites, true, isDroppable);
     }
 
     public void apply(boolean durableWrites)
     {
-        try
-        {
-            Keyspace ks = Keyspace.open(keyspaceName);
-            Uninterruptibles.getUninterruptibly(ks.applyNotDeferrable(this, durableWrites));
-        }
-        catch (ExecutionException e)
-        {
-            throw Throwables.propagate(e.getCause());
-        }
+        apply(durableWrites, true);
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index af8efb4..d53f0f8 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -642,15 +642,7 @@ public class CommitLogReplayer
                 {
                     assert !newMutation.isEmpty();
 
-                    try
-                    {
-                        Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation));
-                    }
-                    catch (ExecutionException e)
-                    {
-                        throw Throwables.propagate(e.getCause());
-                    }
-
+                    Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false, true, false);
                     keyspacesRecovered.add(keyspace);
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/service/paxos/PaxosState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index 0940950..ee1ba6a 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -147,14 +147,7 @@ public class PaxosState
             {
                 Tracing.trace("Committing proposal {}", proposal);
                 Mutation mutation = proposal.makeMutation();
-                try
-                {
-                    Uninterruptibles.getUninterruptibly(Keyspace.open(mutation.getKeyspaceName()).applyNotDeferrable(mutation, true));
-                }
-                catch (ExecutionException e)
-                {
-                    throw Throwables.propagate(e.getCause());
-                }
+                Keyspace.open(mutation.getKeyspaceName()).apply(mutation, true);
             }
             else
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 040906b..b6b8387 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -178,14 +178,15 @@ public class StreamReceiveTask extends StreamTask
                     {
                         for (SSTableReader reader : readers)
                         {
+                            Keyspace ks = Keyspace.open(reader.getKeyspaceName());
                             try (ISSTableScanner scanner = reader.getScanner())
                             {
                                 while (scanner.hasNext())
                                 {
                                     try (UnfilteredRowIterator rowIterator = scanner.next())
                                     {
-                                        //Apply unsafe (we will flush below before transaction is done)
-                                        new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe();
+                                        // MV *can* be applied unsafe as we flush below before transaction is done.
+                                        ks.apply(new Mutation(PartitionUpdate.fromIterator(rowIterator)), false, true, false);
                                     }
                                 }
                             }


[2/2] cassandra git commit: Make hint delivery async so MV hint is deferred on failure to acquire lock

Posted by pa...@apache.org.
Make hint delivery async so MV hint is deferred on failure to acquire lock

Patch by Paulo Motta; Reviewed by Benjamin Roth for CASSANDRA-12905


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/48abc036
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/48abc036
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/48abc036

Branch: refs/heads/cassandra-3.0
Commit: 48abc0369799acca0521150e2c88d4032e01c3b5
Parents: 3faa0d9
Author: Paulo Motta <pa...@gmail.com>
Authored: Thu Dec 15 12:49:38 2016 -0200
Committer: Paulo Motta <pa...@apache.org>
Committed: Thu Dec 15 16:46:05 2016 -0200

----------------------------------------------------------------------
 src/java/org/apache/cassandra/hints/Hint.java   | 40 ++++++++++++++------
 .../apache/cassandra/hints/HintVerbHandler.java |  4 +-
 2 files changed, 30 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/48abc036/src/java/org/apache/cassandra/hints/Hint.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/Hint.java b/src/java/org/apache/cassandra/hints/Hint.java
index cbb5e74..17fbf5d 100644
--- a/src/java/org/apache/cassandra/hints/Hint.java
+++ b/src/java/org/apache/cassandra/hints/Hint.java
@@ -19,8 +19,12 @@ package org.apache.cassandra.hints;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Throwables;
+
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -68,7 +72,7 @@ public final class Hint
         return new Hint(mutation, creationTime, mutation.smallestGCGS());
     }
 
-    /**
+    /*
      * @param mutation the hinted mutation
      * @param creationTime time of this hint's creation (in milliseconds since epoch)
      * @param gcgs the smallest gcgs of all tables involved at the time of hint creation (in seconds)
@@ -81,19 +85,33 @@ public final class Hint
     /**
      * Applies the contained mutation unless it's expired, filtering out any updates for truncated tables
      */
-    void apply()
+    CompletableFuture<?> applyFuture()
     {
-        if (!isLive())
-            return;
+        if (isLive())
+        {
+            // filter out partition update for table that have been truncated since hint's creation
+            Mutation filtered = mutation;
+            for (UUID id : mutation.getColumnFamilyIds())
+                if (creationTime <= SystemKeyspace.getTruncatedAt(id))
+                    filtered = filtered.without(id);
 
-        // filter out partition update for table that have been truncated since hint's creation
-        Mutation filtered = mutation;
-        for (UUID id : mutation.getColumnFamilyIds())
-            if (creationTime <= SystemKeyspace.getTruncatedAt(id))
-                filtered = filtered.without(id);
+            if (!filtered.isEmpty())
+                return filtered.applyFuture();
+        }
+
+        return CompletableFuture.completedFuture(null);
+    }
 
-        if (!filtered.isEmpty())
-            filtered.apply();
+    void apply()
+    {
+        try
+        {
+            applyFuture().get();
+        }
+        catch (Exception e)
+        {
+            throw Throwables.propagate(e.getCause());
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48abc036/src/java/org/apache/cassandra/hints/HintVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintVerbHandler.java b/src/java/org/apache/cassandra/hints/HintVerbHandler.java
index d8838a9..abcd1f9 100644
--- a/src/java/org/apache/cassandra/hints/HintVerbHandler.java
+++ b/src/java/org/apache/cassandra/hints/HintVerbHandler.java
@@ -88,10 +88,8 @@ public final class HintVerbHandler implements IVerbHandler<HintMessage>
         else
         {
             // the common path - the node is both the destination and a valid replica for the hint.
-            hint.apply();
+            hint.applyFuture().thenAccept(o -> reply(id, message.from)).exceptionally(e -> {logger.debug("Failed to apply hint", e); return null;});
         }
-
-        reply(id, message.from);
     }
 
     private static void reply(int id, InetAddress to)