You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pa...@apache.org on 2016/12/15 19:15:27 UTC
[1/2] cassandra git commit: Retry acquire MV lock on failure instead
of throwing WTE on streaming
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 9fc1ffb63 -> 48abc0369
Retry acquire MV lock on failure instead of throwing WTE on streaming
Patch by Benjamin Roth; Reviewed by Paulo Motta for CASSANDRA-12905
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3faa0d92
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3faa0d92
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3faa0d92
Branch: refs/heads/cassandra-3.0
Commit: 3faa0d925791be085b92949a0a0ec20f7e6ae368
Parents: 9fc1ffb
Author: brstgt <br...@googlemail.com>
Authored: Thu Dec 15 12:42:31 2016 -0200
Committer: Paulo Motta <pa...@apache.org>
Committed: Thu Dec 15 16:46:00 2016 -0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Keyspace.java | 92 +++++++++++---------
src/java/org/apache/cassandra/db/Mutation.java | 17 ++--
.../db/commitlog/CommitLogReplayer.java | 10 +--
.../cassandra/service/paxos/PaxosState.java | 9 +-
.../cassandra/streaming/StreamReceiveTask.java | 5 +-
6 files changed, 63 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e69bf08..63e095d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.11
+ * Improve contention handling on failure to acquire MV lock for streaming and hints (CASSANDRA-12905)
* Fix DELETE and UPDATE queries with empty IN restrictions (CASSANDRA-12829)
* Mark MVs as built after successful bootstrap (CASSANDRA-12984)
* Estimated TS drop-time histogram updated with Cell.NO_DELETION_TIME (CASSANDRA-13040)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index ec5102b..3715995 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -26,6 +26,8 @@ import java.util.concurrent.locks.Lock;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
@@ -50,8 +52,6 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* It represents a Keyspace.
@@ -62,7 +62,7 @@ public class Keyspace
private static final String TEST_FAIL_WRITES_KS = System.getProperty("cassandra.test.fail_writes_ks", "");
private static final boolean TEST_FAIL_WRITES = !TEST_FAIL_WRITES_KS.isEmpty();
- private static int TEST_FAIL_MV_LOCKS_COUNT = Integer.getInteger(System.getProperty("cassandra.test.fail_mv_locks_count", "0"), 0);
+ private static int TEST_FAIL_MV_LOCKS_COUNT = Integer.getInteger("cassandra.test.fail_mv_locks_count", 0);
public final KeyspaceMetrics metric;
@@ -379,42 +379,40 @@ public class Keyspace
}
}
- public CompletableFuture<?> apply(Mutation mutation, boolean writeCommitLog)
- {
- return apply(mutation, writeCommitLog, true, false, null);
- }
-
- /**
- * Should be used if caller is blocking and runs in mutation stage.
- * Otherwise there is a race condition where ALL mutation workers are beeing blocked ending
- * in a complete deadlock of the mutation stage. See CASSANDRA-12689.
- *
- * @param mutation
- * @param writeCommitLog
- * @return
- */
- public CompletableFuture<?> applyNotDeferrable(Mutation mutation, boolean writeCommitLog)
+ public CompletableFuture<?> applyFuture(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
{
- return apply(mutation, writeCommitLog, true, false, false, null);
+ return apply(mutation, writeCommitLog, updateIndexes, true, true, null);
}
- public CompletableFuture<?> apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
+ public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
{
- return apply(mutation, writeCommitLog, updateIndexes, false, null);
+ apply(mutation, writeCommitLog, updateIndexes, true);
}
- public CompletableFuture<?> applyFromCommitLog(Mutation mutation)
+ public void apply(final Mutation mutation,
+ final boolean writeCommitLog)
{
- return apply(mutation, false, true, true, null);
+ apply(mutation, writeCommitLog, true, true);
}
- public CompletableFuture<?> apply(final Mutation mutation,
- final boolean writeCommitLog,
- boolean updateIndexes,
- boolean isClReplay,
- CompletableFuture<?> future)
+ /**
+ * If apply is blocking, apply must not be deferred
+ * Otherwise there is a race condition where ALL mutation workers are beeing blocked ending
+ * in a complete deadlock of the mutation stage. See CASSANDRA-12689.
+ *
+ * @param mutation the row to write. Must not be modified after calling apply, since commitlog append
+ * may happen concurrently, depending on the CL Executor type.
+ * @param writeCommitLog false to disable commitlog append entirely
+ * @param updateIndexes false to disable index updates (used by CollationController "defragmenting")
+ * @param isDroppable true if this should throw WriteTimeoutException if it does not acquire lock within write_request_timeout_in_ms
+ * @throws ExecutionException
+ */
+ public void apply(final Mutation mutation,
+ final boolean writeCommitLog,
+ boolean updateIndexes,
+ boolean isDroppable)
{
- return apply(mutation, writeCommitLog, updateIndexes, isClReplay, true, future);
+ apply(mutation, writeCommitLog, updateIndexes, isDroppable, false, null);
}
/**
@@ -424,13 +422,13 @@ public class Keyspace
* may happen concurrently, depending on the CL Executor type.
* @param writeCommitLog false to disable commitlog append entirely
* @param updateIndexes false to disable index updates (used by CollationController "defragmenting")
- * @param isClReplay true if caller is the commitlog replayer
+ * @param isDroppable true if this should throw WriteTimeoutException if it does not acquire lock within write_request_timeout_in_ms
* @param isDeferrable true if caller is not waiting for future to complete, so that future may be deferred
*/
- public CompletableFuture<?> apply(final Mutation mutation,
+ private CompletableFuture<?> apply(final Mutation mutation,
final boolean writeCommitLog,
boolean updateIndexes,
- boolean isClReplay,
+ boolean isDroppable,
boolean isDeferrable,
CompletableFuture<?> future)
{
@@ -438,7 +436,11 @@ public class Keyspace
throw new RuntimeException("Testing write failures");
boolean requiresViewUpdate = updateIndexes && viewManager.updatesAffectView(Collections.singleton(mutation), false);
- final CompletableFuture<?> mark = future == null ? new CompletableFuture<>() : future;
+
+ // If apply is not deferrable, no future is required, returns always null
+ if (isDeferrable && future == null) {
+ future = new CompletableFuture<>();
+ }
Lock lock = null;
if (requiresViewUpdate)
@@ -453,15 +455,15 @@ public class Keyspace
if (lock == null)
{
- // avoid throwing a WTE during commitlog replay
- if (!isClReplay && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
+ //throw WTE only if request is droppable
+ if (isDroppable && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
{
logger.trace("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()));
Tracing.trace("Could not acquire MV lock");
if (future != null)
{
future.completeExceptionally(new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1));
- return mark;
+ return future;
}
else
{
@@ -472,11 +474,12 @@ public class Keyspace
{
//This view update can't happen right now. so rather than keep this thread busy
// we will re-apply ourself to the queue and try again later
+ final CompletableFuture<?> mark = future;
StageManager.getStage(Stage.MUTATION).execute(() ->
- apply(mutation, writeCommitLog, true, isClReplay, mark)
+ apply(mutation, writeCommitLog, true, isDroppable, true, mark)
);
- return mark;
+ return future;
}
else
{
@@ -499,7 +502,9 @@ public class Keyspace
else
{
long acquireTime = System.currentTimeMillis() - mutation.viewLockAcquireStart.get();
- if (!isClReplay)
+ // Metrics are only collected for droppable write operations
+ // Bulk non-droppable operations (e.g. commitlog replay, hint delivery) are not measured
+ if (isDroppable)
{
for (UUID cfid : mutation.getColumnFamilyIds())
columnFamilyStores.get(cfid).metric.viewLockAcquireTime.update(acquireTime, TimeUnit.MILLISECONDS);
@@ -534,7 +539,7 @@ public class Keyspace
try
{
Tracing.trace("Creating materialized view mutations from base table replica");
- viewManager.forTable(upd.metadata()).pushViewReplicaUpdates(upd, writeCommitLog && !isClReplay, baseComplete);
+ viewManager.forTable(upd.metadata()).pushViewReplicaUpdates(upd, writeCommitLog, baseComplete);
}
catch (Throwable t)
{
@@ -553,8 +558,11 @@ public class Keyspace
if (requiresViewUpdate)
baseComplete.set(System.currentTimeMillis());
}
- mark.complete(null);
- return mark;
+
+ if (future != null) {
+ future.complete(null);
+ }
+ return future;
}
finally
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 2955677..7ed69c0 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -202,20 +202,17 @@ public class Mutation implements IMutation
public CompletableFuture<?> applyFuture()
{
Keyspace ks = Keyspace.open(keyspaceName);
- return ks.apply(this, Keyspace.open(keyspaceName).getMetadata().params.durableWrites);
+ return ks.applyFuture(this, Keyspace.open(keyspaceName).getMetadata().params.durableWrites, true);
+ }
+
+ public void apply(boolean durableWrites, boolean isDroppable)
+ {
+ Keyspace.open(keyspaceName).apply(this, durableWrites, true, isDroppable);
}
public void apply(boolean durableWrites)
{
- try
- {
- Keyspace ks = Keyspace.open(keyspaceName);
- Uninterruptibles.getUninterruptibly(ks.applyNotDeferrable(this, durableWrites));
- }
- catch (ExecutionException e)
- {
- throw Throwables.propagate(e.getCause());
- }
+ apply(durableWrites, true);
}
/*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index af8efb4..d53f0f8 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -642,15 +642,7 @@ public class CommitLogReplayer
{
assert !newMutation.isEmpty();
- try
- {
- Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation));
- }
- catch (ExecutionException e)
- {
- throw Throwables.propagate(e.getCause());
- }
-
+ Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false, true, false);
keyspacesRecovered.add(keyspace);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/service/paxos/PaxosState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index 0940950..ee1ba6a 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -147,14 +147,7 @@ public class PaxosState
{
Tracing.trace("Committing proposal {}", proposal);
Mutation mutation = proposal.makeMutation();
- try
- {
- Uninterruptibles.getUninterruptibly(Keyspace.open(mutation.getKeyspaceName()).applyNotDeferrable(mutation, true));
- }
- catch (ExecutionException e)
- {
- throw Throwables.propagate(e.getCause());
- }
+ Keyspace.open(mutation.getKeyspaceName()).apply(mutation, true);
}
else
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 040906b..b6b8387 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -178,14 +178,15 @@ public class StreamReceiveTask extends StreamTask
{
for (SSTableReader reader : readers)
{
+ Keyspace ks = Keyspace.open(reader.getKeyspaceName());
try (ISSTableScanner scanner = reader.getScanner())
{
while (scanner.hasNext())
{
try (UnfilteredRowIterator rowIterator = scanner.next())
{
- //Apply unsafe (we will flush below before transaction is done)
- new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe();
+ // MV *can* be applied unsafe as we flush below before transaction is done.
+ ks.apply(new Mutation(PartitionUpdate.fromIterator(rowIterator)), false, true, false);
}
}
}
[2/2] cassandra git commit: Make hint delivery async so MV hint is
deferred on failure to acquire lock
Posted by pa...@apache.org.
Make hint delivery async so MV hint is deferred on failure to acquire lock
Patch by Paulo Motta; Reviewed by Benjamin Roth for CASSANDRA-12905
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/48abc036
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/48abc036
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/48abc036
Branch: refs/heads/cassandra-3.0
Commit: 48abc0369799acca0521150e2c88d4032e01c3b5
Parents: 3faa0d9
Author: Paulo Motta <pa...@gmail.com>
Authored: Thu Dec 15 12:49:38 2016 -0200
Committer: Paulo Motta <pa...@apache.org>
Committed: Thu Dec 15 16:46:05 2016 -0200
----------------------------------------------------------------------
src/java/org/apache/cassandra/hints/Hint.java | 40 ++++++++++++++------
.../apache/cassandra/hints/HintVerbHandler.java | 4 +-
2 files changed, 30 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48abc036/src/java/org/apache/cassandra/hints/Hint.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/Hint.java b/src/java/org/apache/cassandra/hints/Hint.java
index cbb5e74..17fbf5d 100644
--- a/src/java/org/apache/cassandra/hints/Hint.java
+++ b/src/java/org/apache/cassandra/hints/Hint.java
@@ -19,8 +19,12 @@ package org.apache.cassandra.hints;
import java.io.IOException;
import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import com.google.common.base.Throwables;
+
import org.apache.cassandra.db.*;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -68,7 +72,7 @@ public final class Hint
return new Hint(mutation, creationTime, mutation.smallestGCGS());
}
- /**
+ /*
* @param mutation the hinted mutation
* @param creationTime time of this hint's creation (in milliseconds since epoch)
* @param gcgs the smallest gcgs of all tables involved at the time of hint creation (in seconds)
@@ -81,19 +85,33 @@ public final class Hint
/**
* Applies the contained mutation unless it's expired, filtering out any updates for truncated tables
*/
- void apply()
+ CompletableFuture<?> applyFuture()
{
- if (!isLive())
- return;
+ if (isLive())
+ {
+ // filter out partition update for table that have been truncated since hint's creation
+ Mutation filtered = mutation;
+ for (UUID id : mutation.getColumnFamilyIds())
+ if (creationTime <= SystemKeyspace.getTruncatedAt(id))
+ filtered = filtered.without(id);
- // filter out partition update for table that have been truncated since hint's creation
- Mutation filtered = mutation;
- for (UUID id : mutation.getColumnFamilyIds())
- if (creationTime <= SystemKeyspace.getTruncatedAt(id))
- filtered = filtered.without(id);
+ if (!filtered.isEmpty())
+ return filtered.applyFuture();
+ }
+
+ return CompletableFuture.completedFuture(null);
+ }
- if (!filtered.isEmpty())
- filtered.apply();
+ void apply()
+ {
+ try
+ {
+ applyFuture().get();
+ }
+ catch (Exception e)
+ {
+ throw Throwables.propagate(e.getCause());
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48abc036/src/java/org/apache/cassandra/hints/HintVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintVerbHandler.java b/src/java/org/apache/cassandra/hints/HintVerbHandler.java
index d8838a9..abcd1f9 100644
--- a/src/java/org/apache/cassandra/hints/HintVerbHandler.java
+++ b/src/java/org/apache/cassandra/hints/HintVerbHandler.java
@@ -88,10 +88,8 @@ public final class HintVerbHandler implements IVerbHandler<HintMessage>
else
{
// the common path - the node is both the destination and a valid replica for the hint.
- hint.apply();
+ hint.applyFuture().thenAccept(o -> reply(id, message.from)).exceptionally(e -> {logger.debug("Failed to apply hint", e); return null;});
}
-
- reply(id, message.from);
}
private static void reply(int id, InetAddress to)