You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pa...@apache.org on 2016/12/15 19:17:39 UTC
[1/4] cassandra git commit: Retry acquire MV lock on failure instead
of throwing WTE on streaming
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.X e5a77cfea -> b1e23ab54
Retry acquire MV lock on failure instead of throwing WTE on streaming
Patch by Benjamin Roth; Reviewed by Paulo Motta for CASSANDRA-12905
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3faa0d92
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3faa0d92
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3faa0d92
Branch: refs/heads/cassandra-3.X
Commit: 3faa0d925791be085b92949a0a0ec20f7e6ae368
Parents: 9fc1ffb
Author: brstgt <br...@googlemail.com>
Authored: Thu Dec 15 12:42:31 2016 -0200
Committer: Paulo Motta <pa...@apache.org>
Committed: Thu Dec 15 16:46:00 2016 -0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Keyspace.java | 92 +++++++++++---------
src/java/org/apache/cassandra/db/Mutation.java | 17 ++--
.../db/commitlog/CommitLogReplayer.java | 10 +--
.../cassandra/service/paxos/PaxosState.java | 9 +-
.../cassandra/streaming/StreamReceiveTask.java | 5 +-
6 files changed, 63 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e69bf08..63e095d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.11
+ * Improve contention handling on failure to acquire MV lock for streaming and hints (CASSANDRA-12905)
* Fix DELETE and UPDATE queries with empty IN restrictions (CASSANDRA-12829)
* Mark MVs as built after successful bootstrap (CASSANDRA-12984)
* Estimated TS drop-time histogram updated with Cell.NO_DELETION_TIME (CASSANDRA-13040)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index ec5102b..3715995 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -26,6 +26,8 @@ import java.util.concurrent.locks.Lock;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
@@ -50,8 +52,6 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* It represents a Keyspace.
@@ -62,7 +62,7 @@ public class Keyspace
private static final String TEST_FAIL_WRITES_KS = System.getProperty("cassandra.test.fail_writes_ks", "");
private static final boolean TEST_FAIL_WRITES = !TEST_FAIL_WRITES_KS.isEmpty();
- private static int TEST_FAIL_MV_LOCKS_COUNT = Integer.getInteger(System.getProperty("cassandra.test.fail_mv_locks_count", "0"), 0);
+ private static int TEST_FAIL_MV_LOCKS_COUNT = Integer.getInteger("cassandra.test.fail_mv_locks_count", 0);
public final KeyspaceMetrics metric;
@@ -379,42 +379,40 @@ public class Keyspace
}
}
- public CompletableFuture<?> apply(Mutation mutation, boolean writeCommitLog)
- {
- return apply(mutation, writeCommitLog, true, false, null);
- }
-
- /**
- * Should be used if caller is blocking and runs in mutation stage.
- * Otherwise there is a race condition where ALL mutation workers are beeing blocked ending
- * in a complete deadlock of the mutation stage. See CASSANDRA-12689.
- *
- * @param mutation
- * @param writeCommitLog
- * @return
- */
- public CompletableFuture<?> applyNotDeferrable(Mutation mutation, boolean writeCommitLog)
+ public CompletableFuture<?> applyFuture(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
{
- return apply(mutation, writeCommitLog, true, false, false, null);
+ return apply(mutation, writeCommitLog, updateIndexes, true, true, null);
}
- public CompletableFuture<?> apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
+ public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
{
- return apply(mutation, writeCommitLog, updateIndexes, false, null);
+ apply(mutation, writeCommitLog, updateIndexes, true);
}
- public CompletableFuture<?> applyFromCommitLog(Mutation mutation)
+ public void apply(final Mutation mutation,
+ final boolean writeCommitLog)
{
- return apply(mutation, false, true, true, null);
+ apply(mutation, writeCommitLog, true, true);
}
- public CompletableFuture<?> apply(final Mutation mutation,
- final boolean writeCommitLog,
- boolean updateIndexes,
- boolean isClReplay,
- CompletableFuture<?> future)
+ /**
+ * If apply is blocking, apply must not be deferred
+ * Otherwise there is a race condition where ALL mutation workers are beeing blocked ending
+ * in a complete deadlock of the mutation stage. See CASSANDRA-12689.
+ *
+ * @param mutation the row to write. Must not be modified after calling apply, since commitlog append
+ * may happen concurrently, depending on the CL Executor type.
+ * @param writeCommitLog false to disable commitlog append entirely
+ * @param updateIndexes false to disable index updates (used by CollationController "defragmenting")
+ * @param isDroppable true if this should throw WriteTimeoutException if it does not acquire lock within write_request_timeout_in_ms
+ * @throws ExecutionException
+ */
+ public void apply(final Mutation mutation,
+ final boolean writeCommitLog,
+ boolean updateIndexes,
+ boolean isDroppable)
{
- return apply(mutation, writeCommitLog, updateIndexes, isClReplay, true, future);
+ apply(mutation, writeCommitLog, updateIndexes, isDroppable, false, null);
}
/**
@@ -424,13 +422,13 @@ public class Keyspace
* may happen concurrently, depending on the CL Executor type.
* @param writeCommitLog false to disable commitlog append entirely
* @param updateIndexes false to disable index updates (used by CollationController "defragmenting")
- * @param isClReplay true if caller is the commitlog replayer
+ * @param isDroppable true if this should throw WriteTimeoutException if it does not acquire lock within write_request_timeout_in_ms
* @param isDeferrable true if caller is not waiting for future to complete, so that future may be deferred
*/
- public CompletableFuture<?> apply(final Mutation mutation,
+ private CompletableFuture<?> apply(final Mutation mutation,
final boolean writeCommitLog,
boolean updateIndexes,
- boolean isClReplay,
+ boolean isDroppable,
boolean isDeferrable,
CompletableFuture<?> future)
{
@@ -438,7 +436,11 @@ public class Keyspace
throw new RuntimeException("Testing write failures");
boolean requiresViewUpdate = updateIndexes && viewManager.updatesAffectView(Collections.singleton(mutation), false);
- final CompletableFuture<?> mark = future == null ? new CompletableFuture<>() : future;
+
+ // If apply is not deferrable, no future is required, returns always null
+ if (isDeferrable && future == null) {
+ future = new CompletableFuture<>();
+ }
Lock lock = null;
if (requiresViewUpdate)
@@ -453,15 +455,15 @@ public class Keyspace
if (lock == null)
{
- // avoid throwing a WTE during commitlog replay
- if (!isClReplay && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
+ //throw WTE only if request is droppable
+ if (isDroppable && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
{
logger.trace("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()));
Tracing.trace("Could not acquire MV lock");
if (future != null)
{
future.completeExceptionally(new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1));
- return mark;
+ return future;
}
else
{
@@ -472,11 +474,12 @@ public class Keyspace
{
//This view update can't happen right now. so rather than keep this thread busy
// we will re-apply ourself to the queue and try again later
+ final CompletableFuture<?> mark = future;
StageManager.getStage(Stage.MUTATION).execute(() ->
- apply(mutation, writeCommitLog, true, isClReplay, mark)
+ apply(mutation, writeCommitLog, true, isDroppable, true, mark)
);
- return mark;
+ return future;
}
else
{
@@ -499,7 +502,9 @@ public class Keyspace
else
{
long acquireTime = System.currentTimeMillis() - mutation.viewLockAcquireStart.get();
- if (!isClReplay)
+ // Metrics are only collected for droppable write operations
+ // Bulk non-droppable operations (e.g. commitlog replay, hint delivery) are not measured
+ if (isDroppable)
{
for (UUID cfid : mutation.getColumnFamilyIds())
columnFamilyStores.get(cfid).metric.viewLockAcquireTime.update(acquireTime, TimeUnit.MILLISECONDS);
@@ -534,7 +539,7 @@ public class Keyspace
try
{
Tracing.trace("Creating materialized view mutations from base table replica");
- viewManager.forTable(upd.metadata()).pushViewReplicaUpdates(upd, writeCommitLog && !isClReplay, baseComplete);
+ viewManager.forTable(upd.metadata()).pushViewReplicaUpdates(upd, writeCommitLog, baseComplete);
}
catch (Throwable t)
{
@@ -553,8 +558,11 @@ public class Keyspace
if (requiresViewUpdate)
baseComplete.set(System.currentTimeMillis());
}
- mark.complete(null);
- return mark;
+
+ if (future != null) {
+ future.complete(null);
+ }
+ return future;
}
finally
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 2955677..7ed69c0 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -202,20 +202,17 @@ public class Mutation implements IMutation
public CompletableFuture<?> applyFuture()
{
Keyspace ks = Keyspace.open(keyspaceName);
- return ks.apply(this, Keyspace.open(keyspaceName).getMetadata().params.durableWrites);
+ return ks.applyFuture(this, Keyspace.open(keyspaceName).getMetadata().params.durableWrites, true);
+ }
+
+ public void apply(boolean durableWrites, boolean isDroppable)
+ {
+ Keyspace.open(keyspaceName).apply(this, durableWrites, true, isDroppable);
}
public void apply(boolean durableWrites)
{
- try
- {
- Keyspace ks = Keyspace.open(keyspaceName);
- Uninterruptibles.getUninterruptibly(ks.applyNotDeferrable(this, durableWrites));
- }
- catch (ExecutionException e)
- {
- throw Throwables.propagate(e.getCause());
- }
+ apply(durableWrites, true);
}
/*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index af8efb4..d53f0f8 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -642,15 +642,7 @@ public class CommitLogReplayer
{
assert !newMutation.isEmpty();
- try
- {
- Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation));
- }
- catch (ExecutionException e)
- {
- throw Throwables.propagate(e.getCause());
- }
-
+ Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false, true, false);
keyspacesRecovered.add(keyspace);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/service/paxos/PaxosState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index 0940950..ee1ba6a 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -147,14 +147,7 @@ public class PaxosState
{
Tracing.trace("Committing proposal {}", proposal);
Mutation mutation = proposal.makeMutation();
- try
- {
- Uninterruptibles.getUninterruptibly(Keyspace.open(mutation.getKeyspaceName()).applyNotDeferrable(mutation, true));
- }
- catch (ExecutionException e)
- {
- throw Throwables.propagate(e.getCause());
- }
+ Keyspace.open(mutation.getKeyspaceName()).apply(mutation, true);
}
else
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 040906b..b6b8387 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -178,14 +178,15 @@ public class StreamReceiveTask extends StreamTask
{
for (SSTableReader reader : readers)
{
+ Keyspace ks = Keyspace.open(reader.getKeyspaceName());
try (ISSTableScanner scanner = reader.getScanner())
{
while (scanner.hasNext())
{
try (UnfilteredRowIterator rowIterator = scanner.next())
{
- //Apply unsafe (we will flush below before transaction is done)
- new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe();
+ // MV *can* be applied unsafe as we flush below before transaction is done.
+ ks.apply(new Mutation(PartitionUpdate.fromIterator(rowIterator)), false, true, false);
}
}
}
[2/4] cassandra git commit: Make hint delivery async so MV hint is
deferred on failure to acquire lock
Posted by pa...@apache.org.
Make hint delivery async so MV hint is deferred on failure to acquire lock
Patch by Paulo Motta; Reviewed by Benjamin Roth for CASSANDRA-12905
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/48abc036
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/48abc036
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/48abc036
Branch: refs/heads/cassandra-3.X
Commit: 48abc0369799acca0521150e2c88d4032e01c3b5
Parents: 3faa0d9
Author: Paulo Motta <pa...@gmail.com>
Authored: Thu Dec 15 12:49:38 2016 -0200
Committer: Paulo Motta <pa...@apache.org>
Committed: Thu Dec 15 16:46:05 2016 -0200
----------------------------------------------------------------------
src/java/org/apache/cassandra/hints/Hint.java | 40 ++++++++++++++------
.../apache/cassandra/hints/HintVerbHandler.java | 4 +-
2 files changed, 30 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48abc036/src/java/org/apache/cassandra/hints/Hint.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/Hint.java b/src/java/org/apache/cassandra/hints/Hint.java
index cbb5e74..17fbf5d 100644
--- a/src/java/org/apache/cassandra/hints/Hint.java
+++ b/src/java/org/apache/cassandra/hints/Hint.java
@@ -19,8 +19,12 @@ package org.apache.cassandra.hints;
import java.io.IOException;
import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import com.google.common.base.Throwables;
+
import org.apache.cassandra.db.*;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -68,7 +72,7 @@ public final class Hint
return new Hint(mutation, creationTime, mutation.smallestGCGS());
}
- /**
+ /*
* @param mutation the hinted mutation
* @param creationTime time of this hint's creation (in milliseconds since epoch)
* @param gcgs the smallest gcgs of all tables involved at the time of hint creation (in seconds)
@@ -81,19 +85,33 @@ public final class Hint
/**
* Applies the contained mutation unless it's expired, filtering out any updates for truncated tables
*/
- void apply()
+ CompletableFuture<?> applyFuture()
{
- if (!isLive())
- return;
+ if (isLive())
+ {
+ // filter out partition update for table that have been truncated since hint's creation
+ Mutation filtered = mutation;
+ for (UUID id : mutation.getColumnFamilyIds())
+ if (creationTime <= SystemKeyspace.getTruncatedAt(id))
+ filtered = filtered.without(id);
- // filter out partition update for table that have been truncated since hint's creation
- Mutation filtered = mutation;
- for (UUID id : mutation.getColumnFamilyIds())
- if (creationTime <= SystemKeyspace.getTruncatedAt(id))
- filtered = filtered.without(id);
+ if (!filtered.isEmpty())
+ return filtered.applyFuture();
+ }
+
+ return CompletableFuture.completedFuture(null);
+ }
- if (!filtered.isEmpty())
- filtered.apply();
+ void apply()
+ {
+ try
+ {
+ applyFuture().get();
+ }
+ catch (Exception e)
+ {
+ throw Throwables.propagate(e.getCause());
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/48abc036/src/java/org/apache/cassandra/hints/HintVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintVerbHandler.java b/src/java/org/apache/cassandra/hints/HintVerbHandler.java
index d8838a9..abcd1f9 100644
--- a/src/java/org/apache/cassandra/hints/HintVerbHandler.java
+++ b/src/java/org/apache/cassandra/hints/HintVerbHandler.java
@@ -88,10 +88,8 @@ public final class HintVerbHandler implements IVerbHandler<HintMessage>
else
{
// the common path - the node is both the destination and a valid replica for the hint.
- hint.apply();
+ hint.applyFuture().thenAccept(o -> reply(id, message.from)).exceptionally(e -> {logger.debug("Failed to apply hint", e); return null;});
}
-
- reply(id, message.from);
}
private static void reply(int id, InetAddress to)
[3/4] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by pa...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/489be961
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/489be961
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/489be961
Branch: refs/heads/cassandra-3.X
Commit: 489be961c945e4330a9426d21b2bb903cc1d3a54
Parents: 73547a3 48abc03
Author: Paulo Motta <pa...@apache.org>
Authored: Thu Dec 15 16:46:32 2016 -0200
Committer: Paulo Motta <pa...@apache.org>
Committed: Thu Dec 15 16:49:10 2016 -0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Keyspace.java | 89 +++++++++++---------
src/java/org/apache/cassandra/db/Mutation.java | 17 ++--
.../db/commitlog/CommitLogReplayer.java | 10 +--
src/java/org/apache/cassandra/hints/Hint.java | 40 ++++++---
.../apache/cassandra/hints/HintVerbHandler.java | 4 +-
.../cassandra/service/paxos/PaxosState.java | 9 +-
.../cassandra/streaming/StreamReceiveTask.java | 6 +-
8 files changed, 90 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/489be961/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 3db0179,63e095d..fa0c94a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,114 -1,5 +1,115 @@@
-3.0.11
+3.10
+ * Remove outboundBindAny configuration property (CASSANDRA-12673)
+ * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
+ * Remove timing window in test case (CASSANDRA-12875)
+ * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945)
+ * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919)
+ * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
+ * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
+ * Fix Murmur3PartitionerTest (CASSANDRA-12858)
+ * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897)
+ * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
+ * Fix cassandra-stress truncate option (CASSANDRA-12695)
+ * Fix crossNode value when receiving messages (CASSANDRA-12791)
+ * Don't load MX4J beans twice (CASSANDRA-12869)
+ * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838)
+ * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
+ * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
+ * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454)
+ * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
+ * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419)
+ * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
+ * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
+ * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815)
+ * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812)
+ * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
+ * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550)
+ * Add duration data type (CASSANDRA-11873)
+ * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
+ * Improve sum aggregate functions (CASSANDRA-12417)
+ * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761)
+ * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
+ * Check for hash conflicts in prepared statements (CASSANDRA-12733)
+ * Exit query parsing upon first error (CASSANDRA-12598)
+ * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729)
+ * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
+ * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199)
+ * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
+ * Add hint delivery metrics (CASSANDRA-12693)
+ * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
+ * ColumnIndex does not reuse buffer (CASSANDRA-12502)
+ * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
+ * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
+ * Tune compaction thread count via nodetool (CASSANDRA-12248)
+ * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
+ * Include repair session IDs in repair start message (CASSANDRA-12532)
+ * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
+ * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
+ * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
+ * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
+ * Fix cassandra-stress graphing (CASSANDRA-12237)
+ * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
+ * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
+ * Add JMH benchmarks.jar (CASSANDRA-12586)
+ * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
+ * Add keep-alive to streaming (CASSANDRA-11841)
+ * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
+ * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261)
+ * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
+ * Retry all internode messages once after a connection is
+ closed and reopened (CASSANDRA-12192)
+ * Add support to rebuild from targeted replica (CASSANDRA-9875)
+ * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
+ * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
+ * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
+ * Extend read/write failure messages with a map of replica addresses
+ to error codes in the v5 native protocol (CASSANDRA-12311)
+ * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
+ * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550)
+ * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378)
+ * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
+ * Added slow query log (CASSANDRA-12403)
+ * Count full coordinated request against timeout (CASSANDRA-12256)
+ * Allow TTL with null value on insert and update (CASSANDRA-12216)
+ * Make decommission operation resumable (CASSANDRA-12008)
+ * Add support to one-way targeted repair (CASSANDRA-9876)
+ * Remove clientutil jar (CASSANDRA-11635)
+ * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
+ * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)
+ * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
+ * Make it possible to compact a given token range (CASSANDRA-10643)
+ * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
+ * Collect metrics on queries by consistency level (CASSANDRA-7384)
+ * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
+ * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
+ * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
+ * Add version command to cassandra-stress (CASSANDRA-12258)
+ * Create compaction-stress tool (CASSANDRA-11844)
+ * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
+ * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
+ * Support filtering on non-PRIMARY KEY columns in the CREATE
+ MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
+ * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
+ * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
+ * Faster write path (CASSANDRA-12269)
+ * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
+ * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
+ * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635)
+ * Prepend snapshot name with "truncated" or "dropped" when a snapshot
+ is taken before truncating or dropping a table (CASSANDRA-12178)
+ * Optimize RestrictionSet (CASSANDRA-12153)
+ * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
+ * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
+ * Create a system table to expose prepared statements (CASSANDRA-8831)
+ * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
+ * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
+ * Add supplied username to authentication error messages (CASSANDRA-12076)
+ * Remove pre-startup check for open JMX port (CASSANDRA-12074)
+ * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
+ * Restore resumable hints delivery (CASSANDRA-11960)
+ * Properly report LWT contention (CASSANDRA-12626)
+Merged from 3.0:
+ * Improve contention handling on failure to acquire MV lock for streaming and hints (CASSANDRA-12905)
* Fix DELETE and UPDATE queries with empty IN restrictions (CASSANDRA-12829)
* Mark MVs as built after successful bootstrap (CASSANDRA-12984)
* Estimated TS drop-time histogram updated with Cell.NO_DELETION_TIME (CASSANDRA-13040)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/489be961/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Keyspace.java
index bd58f75,3715995..d9f8f62
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@@ -478,63 -435,35 +476,67 @@@ public class Keyspac
if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS))
throw new RuntimeException("Testing write failures");
+ Lock[] locks = null;
+
boolean requiresViewUpdate = updateIndexes && viewManager.updatesAffectView(Collections.singleton(mutation), false);
- final CompletableFuture<?> mark = future == null ? new CompletableFuture<>() : future;
+
+ // If apply is not deferrable, no future is required, returns always null
+ if (isDeferrable && future == null) {
+ future = new CompletableFuture<>();
+ }
- Lock lock = null;
if (requiresViewUpdate)
{
mutation.viewLockAcquireStart.compareAndSet(0L, System.currentTimeMillis());
- while (true)
- {
- if (TEST_FAIL_MV_LOCKS_COUNT == 0)
- lock = ViewManager.acquireLockFor(mutation.key().getKey());
- else
- TEST_FAIL_MV_LOCKS_COUNT--;
- if (lock == null)
+ // the order of lock acquisition doesn't matter (from a deadlock perspective) because we only use tryLock()
+ Collection<UUID> columnFamilyIds = mutation.getColumnFamilyIds();
+ Iterator<UUID> idIterator = columnFamilyIds.iterator();
+
+ locks = new Lock[columnFamilyIds.size()];
+ for (int i = 0; i < columnFamilyIds.size(); i++)
+ {
+ UUID cfid = idIterator.next();
+ int lockKey = Objects.hash(mutation.key().getKey(), cfid);
+ while (true)
{
- //throw WTE only if request is droppable
- if (isDroppable && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
+ Lock lock = null;
+
+ if (TEST_FAIL_MV_LOCKS_COUNT == 0)
+ lock = ViewManager.acquireLockFor(lockKey);
+ else
+ TEST_FAIL_MV_LOCKS_COUNT--;
+
+ if (lock == null)
{
- // avoid throwing a WTE during commitlog replay
- if (!isClReplay && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
- logger.trace("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()));
- Tracing.trace("Could not acquire MV lock");
- if (future != null)
++ //throw WTE only if request is droppable
++ if (isDroppable && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
{
- future.completeExceptionally(new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1));
+ for (int j = 0; j < i; j++)
+ locks[j].unlock();
+
+ logger.trace("Could not acquire lock for {} and table {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()), columnFamilyStores.get(cfid).name);
+ Tracing.trace("Could not acquire MV lock");
+ if (future != null)
+ {
+ future.completeExceptionally(new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1));
- return mark;
++ return future;
+ }
+ else
+ throw new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1);
+ }
+ else if (isDeferrable)
+ {
+ for (int j = 0; j < i; j++)
+ locks[j].unlock();
+
+ // This view update can't happen right now. so rather than keep this thread busy
+ // we will re-apply ourself to the queue and try again later
++ final CompletableFuture<?> mark = future;
+ StageManager.getStage(Stage.MUTATION).execute(() ->
- apply(mutation, writeCommitLog, true, isClReplay, mark)
++ apply(mutation, writeCommitLog, true, isDroppable, true, mark)
+ );
-
- return mark;
+ return future;
}
else
{
@@@ -561,13 -512,6 +563,15 @@@
break;
}
}
+
+ long acquireTime = System.currentTimeMillis() - mutation.viewLockAcquireStart.get();
- if (!isClReplay)
++ // Metrics are only collected for droppable write operations
++ // Bulk non-droppable operations (e.g. commitlog replay, hint delivery) are not measured
++ if (isDroppable)
+ {
+ for(UUID cfid : columnFamilyIds)
+ columnFamilyStores.get(cfid).metric.viewLockAcquireTime.update(acquireTime, TimeUnit.MILLISECONDS);
+ }
}
int nowInSec = FBUtilities.nowInSeconds();
try (OpOrder.Group opGroup = writeOrder.start())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/489be961/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/489be961/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index eeb9bc8,d53f0f8..4d2971f
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -175,105 -208,38 +175,97 @@@ public class CommitLogReplayer implemen
return replayedCount.get();
}
- private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader, boolean tolerateTruncation) throws IOException
+ /*
+ * Wrapper around initiating mutations read from the log to make it possible
+ * to spy on initiated mutations for test
+ */
+ @VisibleForTesting
+ public static class MutationInitiator
{
- if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE)
- {
- // There was no room in the segment to write a final header. No data could be present here.
- return -1;
- }
- reader.seek(offset);
- CRC32 crc = new CRC32();
- updateChecksumInt(crc, (int) (descriptor.id & 0xFFFFFFFFL));
- updateChecksumInt(crc, (int) (descriptor.id >>> 32));
- updateChecksumInt(crc, (int) reader.getPosition());
- int end = reader.readInt();
- long filecrc = reader.readInt() & 0xffffffffL;
- if (crc.getValue() != filecrc)
+ protected Future<Integer> initiateMutation(final Mutation mutation,
+ final long segmentId,
+ final int serializedSize,
+ final int entryLocation,
+ final CommitLogReplayer commitLogReplayer)
{
- if (end != 0 || filecrc != 0)
+ Runnable runnable = new WrappedRunnable()
{
- handleReplayError(false,
- "Encountered bad header at position %d of commit log %s, with invalid CRC. " +
- "The end of segment marker should be zero.",
- offset, reader.getPath());
- }
- return -1;
- }
- else if (end < offset || end > reader.length())
- {
- handleReplayError(tolerateTruncation, "Encountered bad header at position %d of commit log %s, with bad position but valid CRC",
- offset, reader.getPath());
- return -1;
+ public void runMayThrow()
+ {
+ if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
+ return;
+ if (commitLogReplayer.pointInTimeExceeded(mutation))
+ return;
+
+ final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
+
+ // Rebuild the mutation, omitting column families that
+ // a) the user has requested that we ignore,
+ // b) have already been flushed,
+ // or c) are part of a cf that was dropped.
+ // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
+ Mutation newMutation = null;
+ for (PartitionUpdate update : commitLogReplayer.replayFilter.filter(mutation))
+ {
+ if (Schema.instance.getCF(update.metadata().cfId) == null)
+ continue; // dropped
+
+ // replay if current segment is newer than last flushed one or,
+ // if it is the last known segment, if we are after the commit log segment position
+ if (commitLogReplayer.shouldReplay(update.metadata().cfId, new CommitLogPosition(segmentId, entryLocation)))
+ {
+ if (newMutation == null)
+ newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
+ newMutation.add(update);
+ commitLogReplayer.replayedCount.incrementAndGet();
+ }
+ }
+ if (newMutation != null)
+ {
+ assert !newMutation.isEmpty();
+
- try
- {
- Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation));
- }
- catch (ExecutionException e)
- {
- throw Throwables.propagate(e.getCause());
- }
-
++ Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false, true, false);
+ commitLogReplayer.keyspacesReplayed.add(keyspace);
+ }
+ }
+ };
+ return StageManager.getStage(Stage.MUTATION).submit(runnable, serializedSize);
}
- return end;
+ }
+
+ /**
+ * A set of known safe-to-discard commit log replay positions, based on
+ * the range covered by on disk sstables and those prior to the most recent truncation record
+ */
+ public static IntervalSet<CommitLogPosition> persistedIntervals(Iterable<SSTableReader> onDisk, CommitLogPosition truncatedAt)
+ {
+ IntervalSet.Builder<CommitLogPosition> builder = new IntervalSet.Builder<>();
+ for (SSTableReader reader : onDisk)
+ builder.addAll(reader.getSSTableMetadata().commitLogIntervals);
+
+ if (truncatedAt != null)
+ builder.add(CommitLogPosition.NONE, truncatedAt);
+ return builder.build();
+ }
+
+ /**
+ * Find the earliest commit log position that is not covered by the known flushed ranges for some table.
+ *
+ * For efficiency this assumes that the first contiguously flushed interval we know of contains the moment that the
+ * given table was constructed* and hence we can start replay from the end of that interval.
+ *
+ * If such an interval is not known, we must replay from the beginning.
+ *
+ * * This is not true only until if the very first flush of a table stalled or failed, while the second or latter
+ * succeeded. The chances of this happening are at most very low, and if the assumption does prove to be
+ * incorrect during replay there is little chance that the affected deployment is in production.
+ */
+ public static CommitLogPosition firstNotCovered(Collection<IntervalSet<CommitLogPosition>> ranges)
+ {
+ return ranges.stream()
+ .map(intervals -> Iterables.getFirst(intervals.ends(), CommitLogPosition.NONE))
+ .min(Ordering.natural())
+ .get(); // iteration is per known-CF, there must be at least one.
}
abstract static class ReplayFilter
http://git-wip-us.apache.org/repos/asf/cassandra/blob/489be961/src/java/org/apache/cassandra/hints/HintVerbHandler.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/489be961/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 8fe5a49,b6b8387..6c60b74
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -192,17 -185,8 +193,14 @@@ public class StreamReceiveTask extends
{
try (UnfilteredRowIterator rowIterator = scanner.next())
{
- // MV *can* be applied unsafe as we flush below before transaction is done.
- ks.apply(new Mutation(PartitionUpdate.fromIterator(rowIterator)), false, true, false);
+ Mutation m = new Mutation(PartitionUpdate.fromIterator(rowIterator, ColumnFilter.all(cfs.metadata)));
+
+ // MV *can* be applied unsafe if there's no CDC on the CFS as we flush below
+ // before transaction is done.
+ //
+ // If the CFS has CDC, however, these updates need to be written to the CommitLog
+ // so they get archived into the cdc_raw folder
- if (hasCDC)
- m.apply();
- else
- m.applyUnsafe();
++ ks.apply(m, hasCDC, true, false);
}
}
}
[4/4] cassandra git commit: Merge branch 'cassandra-3.11' into
cassandra-3.X
Posted by pa...@apache.org.
Merge branch 'cassandra-3.11' into cassandra-3.X
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b1e23ab5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b1e23ab5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b1e23ab5
Branch: refs/heads/cassandra-3.X
Commit: b1e23ab54f659755fdd7c1adf5c14b05088337f6
Parents: e5a77cf 489be96
Author: Paulo Motta <pa...@apache.org>
Authored: Thu Dec 15 16:50:30 2016 -0200
Committer: Paulo Motta <pa...@apache.org>
Committed: Thu Dec 15 16:50:30 2016 -0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Keyspace.java | 89 +++++++++++---------
src/java/org/apache/cassandra/db/Mutation.java | 17 ++--
.../db/commitlog/CommitLogReplayer.java | 10 +--
src/java/org/apache/cassandra/hints/Hint.java | 40 ++++++---
.../apache/cassandra/hints/HintVerbHandler.java | 4 +-
.../cassandra/service/paxos/PaxosState.java | 9 +-
.../cassandra/streaming/StreamReceiveTask.java | 6 +-
8 files changed, 90 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1e23ab5/CHANGES.txt
----------------------------------------------------------------------