You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pa...@apache.org on 2016/12/15 19:17:39 UTC

[1/4] cassandra git commit: Retry acquire MV lock on failure instead of throwing WTE on streaming

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.X e5a77cfea -> b1e23ab54


Retry acquire MV lock on failure instead of throwing WTE on streaming

Patch by Benjamin Roth; Reviewed by Paulo Motta for CASSANDRA-12905


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3faa0d92
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3faa0d92
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3faa0d92

Branch: refs/heads/cassandra-3.X
Commit: 3faa0d925791be085b92949a0a0ec20f7e6ae368
Parents: 9fc1ffb
Author: brstgt <br...@googlemail.com>
Authored: Thu Dec 15 12:42:31 2016 -0200
Committer: Paulo Motta <pa...@apache.org>
Committed: Thu Dec 15 16:46:00 2016 -0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/db/Keyspace.java  | 92 +++++++++++---------
 src/java/org/apache/cassandra/db/Mutation.java  | 17 ++--
 .../db/commitlog/CommitLogReplayer.java         | 10 +--
 .../cassandra/service/paxos/PaxosState.java     |  9 +-
 .../cassandra/streaming/StreamReceiveTask.java  |  5 +-
 6 files changed, 63 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e69bf08..63e095d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.11
+ * Improve contention handling on failure to acquire MV lock for streaming and hints (CASSANDRA-12905)
  * Fix DELETE and UPDATE queries with empty IN restrictions (CASSANDRA-12829)
  * Mark MVs as built after successful bootstrap (CASSANDRA-12984)
  * Estimated TS drop-time histogram updated with Cell.NO_DELETION_TIME (CASSANDRA-13040)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index ec5102b..3715995 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -26,6 +26,8 @@ import java.util.concurrent.locks.Lock;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
@@ -50,8 +52,6 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * It represents a Keyspace.
@@ -62,7 +62,7 @@ public class Keyspace
 
     private static final String TEST_FAIL_WRITES_KS = System.getProperty("cassandra.test.fail_writes_ks", "");
     private static final boolean TEST_FAIL_WRITES = !TEST_FAIL_WRITES_KS.isEmpty();
-    private static int TEST_FAIL_MV_LOCKS_COUNT = Integer.getInteger(System.getProperty("cassandra.test.fail_mv_locks_count", "0"), 0);
+    private static int TEST_FAIL_MV_LOCKS_COUNT = Integer.getInteger("cassandra.test.fail_mv_locks_count", 0);
 
     public final KeyspaceMetrics metric;
 
@@ -379,42 +379,40 @@ public class Keyspace
         }
     }
 
-    public CompletableFuture<?> apply(Mutation mutation, boolean writeCommitLog)
-    {
-        return apply(mutation, writeCommitLog, true, false, null);
-    }
-
-    /**
-     * Should be used if caller is blocking and runs in mutation stage.
-     * Otherwise there is a race condition where ALL mutation workers are beeing blocked ending
-     * in a complete deadlock of the mutation stage. See CASSANDRA-12689.
-     *
-     * @param mutation
-     * @param writeCommitLog
-     * @return
-     */
-    public CompletableFuture<?> applyNotDeferrable(Mutation mutation, boolean writeCommitLog)
+    public CompletableFuture<?> applyFuture(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
     {
-        return apply(mutation, writeCommitLog, true, false, false, null);
+        return apply(mutation, writeCommitLog, updateIndexes, true, true, null);
     }
 
-    public CompletableFuture<?> apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
+    public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
     {
-        return apply(mutation, writeCommitLog, updateIndexes, false, null);
+        apply(mutation, writeCommitLog, updateIndexes, true);
     }
 
-    public CompletableFuture<?> applyFromCommitLog(Mutation mutation)
+    public void apply(final Mutation mutation,
+                      final boolean writeCommitLog)
     {
-        return apply(mutation, false, true, true, null);
+        apply(mutation, writeCommitLog, true, true);
     }
 
-    public CompletableFuture<?> apply(final Mutation mutation,
-                                      final boolean writeCommitLog,
-                                      boolean updateIndexes,
-                                      boolean isClReplay,
-                                      CompletableFuture<?> future)
+    /**
+     * If apply is blocking, apply must not be deferred
+     * Otherwise there is a race condition where ALL mutation workers are beeing blocked ending
+     * in a complete deadlock of the mutation stage. See CASSANDRA-12689.
+     *
+     * @param mutation       the row to write.  Must not be modified after calling apply, since commitlog append
+     *                       may happen concurrently, depending on the CL Executor type.
+     * @param writeCommitLog false to disable commitlog append entirely
+     * @param updateIndexes  false to disable index updates (used by CollationController "defragmenting")
+     * @param isDroppable    true if this should throw WriteTimeoutException if it does not acquire lock within write_request_timeout_in_ms
+     * @throws ExecutionException
+     */
+    public void apply(final Mutation mutation,
+                      final boolean writeCommitLog,
+                      boolean updateIndexes,
+                      boolean isDroppable)
     {
-        return apply(mutation, writeCommitLog, updateIndexes, isClReplay, true, future);
+        apply(mutation, writeCommitLog, updateIndexes, isDroppable, false, null);
     }
 
     /**
@@ -424,13 +422,13 @@ public class Keyspace
      *                       may happen concurrently, depending on the CL Executor type.
      * @param writeCommitLog false to disable commitlog append entirely
      * @param updateIndexes  false to disable index updates (used by CollationController "defragmenting")
-     * @param isClReplay     true if caller is the commitlog replayer
+     * @param isDroppable    true if this should throw WriteTimeoutException if it does not acquire lock within write_request_timeout_in_ms
      * @param isDeferrable   true if caller is not waiting for future to complete, so that future may be deferred
      */
-    public CompletableFuture<?> apply(final Mutation mutation,
+    private CompletableFuture<?> apply(final Mutation mutation,
                                       final boolean writeCommitLog,
                                       boolean updateIndexes,
-                                      boolean isClReplay,
+                                      boolean isDroppable,
                                       boolean isDeferrable,
                                       CompletableFuture<?> future)
     {
@@ -438,7 +436,11 @@ public class Keyspace
             throw new RuntimeException("Testing write failures");
 
         boolean requiresViewUpdate = updateIndexes && viewManager.updatesAffectView(Collections.singleton(mutation), false);
-        final CompletableFuture<?> mark = future == null ? new CompletableFuture<>() : future;
+
+        // If apply is not deferrable, no future is required, returns always null
+        if (isDeferrable && future == null) {
+            future = new CompletableFuture<>();
+        }
 
         Lock lock = null;
         if (requiresViewUpdate)
@@ -453,15 +455,15 @@ public class Keyspace
 
                 if (lock == null)
                 {
-                    // avoid throwing a WTE during commitlog replay
-                    if (!isClReplay && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
+                    //throw WTE only if request is droppable
+                    if (isDroppable && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
                     {
                         logger.trace("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()));
                         Tracing.trace("Could not acquire MV lock");
                         if (future != null)
                         {
                             future.completeExceptionally(new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1));
-                            return mark;
+                            return future;
                         }
                         else
                         {
@@ -472,11 +474,12 @@ public class Keyspace
                     {
                         //This view update can't happen right now. so rather than keep this thread busy
                         // we will re-apply ourself to the queue and try again later
+                        final CompletableFuture<?> mark = future;
                         StageManager.getStage(Stage.MUTATION).execute(() ->
-                                apply(mutation, writeCommitLog, true, isClReplay, mark)
+                                apply(mutation, writeCommitLog, true, isDroppable, true, mark)
                         );
 
-                        return mark;
+                        return future;
                     }
                     else
                     {
@@ -499,7 +502,9 @@ public class Keyspace
                 else
                 {
                     long acquireTime = System.currentTimeMillis() - mutation.viewLockAcquireStart.get();
-                    if (!isClReplay)
+                    // Metrics are only collected for droppable write operations
+                    // Bulk non-droppable operations (e.g. commitlog replay, hint delivery) are not measured
+                    if (isDroppable)
                     {
                         for (UUID cfid : mutation.getColumnFamilyIds())
                             columnFamilyStores.get(cfid).metric.viewLockAcquireTime.update(acquireTime, TimeUnit.MILLISECONDS);
@@ -534,7 +539,7 @@ public class Keyspace
                     try
                     {
                         Tracing.trace("Creating materialized view mutations from base table replica");
-                        viewManager.forTable(upd.metadata()).pushViewReplicaUpdates(upd, writeCommitLog && !isClReplay, baseComplete);
+                        viewManager.forTable(upd.metadata()).pushViewReplicaUpdates(upd, writeCommitLog, baseComplete);
                     }
                     catch (Throwable t)
                     {
@@ -553,8 +558,11 @@ public class Keyspace
                 if (requiresViewUpdate)
                     baseComplete.set(System.currentTimeMillis());
             }
-            mark.complete(null);
-            return mark;
+
+            if (future != null) {
+                future.complete(null);
+            }
+            return future;
         }
         finally
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 2955677..7ed69c0 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -202,20 +202,17 @@ public class Mutation implements IMutation
     public CompletableFuture<?> applyFuture()
     {
         Keyspace ks = Keyspace.open(keyspaceName);
-        return ks.apply(this, Keyspace.open(keyspaceName).getMetadata().params.durableWrites);
+        return ks.applyFuture(this, Keyspace.open(keyspaceName).getMetadata().params.durableWrites, true);
+    }
+
+    public void apply(boolean durableWrites, boolean isDroppable)
+    {
+        Keyspace.open(keyspaceName).apply(this, durableWrites, true, isDroppable);
     }
 
     public void apply(boolean durableWrites)
     {
-        try
-        {
-            Keyspace ks = Keyspace.open(keyspaceName);
-            Uninterruptibles.getUninterruptibly(ks.applyNotDeferrable(this, durableWrites));
-        }
-        catch (ExecutionException e)
-        {
-            throw Throwables.propagate(e.getCause());
-        }
+        apply(durableWrites, true);
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index af8efb4..d53f0f8 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -642,15 +642,7 @@ public class CommitLogReplayer
                 {
                     assert !newMutation.isEmpty();
 
-                    try
-                    {
-                        Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation));
-                    }
-                    catch (ExecutionException e)
-                    {
-                        throw Throwables.propagate(e.getCause());
-                    }
-
+                    Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false, true, false);
                     keyspacesRecovered.add(keyspace);
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/service/paxos/PaxosState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index 0940950..ee1ba6a 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -147,14 +147,7 @@ public class PaxosState
             {
                 Tracing.trace("Committing proposal {}", proposal);
                 Mutation mutation = proposal.makeMutation();
-                try
-                {
-                    Uninterruptibles.getUninterruptibly(Keyspace.open(mutation.getKeyspaceName()).applyNotDeferrable(mutation, true));
-                }
-                catch (ExecutionException e)
-                {
-                    throw Throwables.propagate(e.getCause());
-                }
+                Keyspace.open(mutation.getKeyspaceName()).apply(mutation, true);
             }
             else
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 040906b..b6b8387 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -178,14 +178,15 @@ public class StreamReceiveTask extends StreamTask
                     {
                         for (SSTableReader reader : readers)
                         {
+                            Keyspace ks = Keyspace.open(reader.getKeyspaceName());
                             try (ISSTableScanner scanner = reader.getScanner())
                             {
                                 while (scanner.hasNext())
                                 {
                                     try (UnfilteredRowIterator rowIterator = scanner.next())
                                     {
-                                        //Apply unsafe (we will flush below before transaction is done)
-                                        new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe();
+                                        // MV *can* be applied unsafe as we flush below before transaction is done.
+                                        ks.apply(new Mutation(PartitionUpdate.fromIterator(rowIterator)), false, true, false);
                                     }
                                 }
                             }


[2/4] cassandra git commit: Make hint delivery async so MV hint is deferred on failure to acquire lock

Posted by pa...@apache.org.
Make hint delivery async so MV hint is deferred on failure to acquire lock

Patch by Paulo Motta; Reviewed by Benjamin Roth for CASSANDRA-12905


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/48abc036
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/48abc036
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/48abc036

Branch: refs/heads/cassandra-3.X
Commit: 48abc0369799acca0521150e2c88d4032e01c3b5
Parents: 3faa0d9
Author: Paulo Motta <pa...@gmail.com>
Authored: Thu Dec 15 12:49:38 2016 -0200
Committer: Paulo Motta <pa...@apache.org>
Committed: Thu Dec 15 16:46:05 2016 -0200

----------------------------------------------------------------------
 src/java/org/apache/cassandra/hints/Hint.java   | 40 ++++++++++++++------
 .../apache/cassandra/hints/HintVerbHandler.java |  4 +-
 2 files changed, 30 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/48abc036/src/java/org/apache/cassandra/hints/Hint.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/Hint.java b/src/java/org/apache/cassandra/hints/Hint.java
index cbb5e74..17fbf5d 100644
--- a/src/java/org/apache/cassandra/hints/Hint.java
+++ b/src/java/org/apache/cassandra/hints/Hint.java
@@ -19,8 +19,12 @@ package org.apache.cassandra.hints;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Throwables;
+
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -68,7 +72,7 @@ public final class Hint
         return new Hint(mutation, creationTime, mutation.smallestGCGS());
     }
 
-    /**
+    /*
      * @param mutation the hinted mutation
      * @param creationTime time of this hint's creation (in milliseconds since epoch)
      * @param gcgs the smallest gcgs of all tables involved at the time of hint creation (in seconds)
@@ -81,19 +85,33 @@ public final class Hint
     /**
      * Applies the contained mutation unless it's expired, filtering out any updates for truncated tables
      */
-    void apply()
+    CompletableFuture<?> applyFuture()
     {
-        if (!isLive())
-            return;
+        if (isLive())
+        {
+            // filter out partition update for table that have been truncated since hint's creation
+            Mutation filtered = mutation;
+            for (UUID id : mutation.getColumnFamilyIds())
+                if (creationTime <= SystemKeyspace.getTruncatedAt(id))
+                    filtered = filtered.without(id);
 
-        // filter out partition update for table that have been truncated since hint's creation
-        Mutation filtered = mutation;
-        for (UUID id : mutation.getColumnFamilyIds())
-            if (creationTime <= SystemKeyspace.getTruncatedAt(id))
-                filtered = filtered.without(id);
+            if (!filtered.isEmpty())
+                return filtered.applyFuture();
+        }
+
+        return CompletableFuture.completedFuture(null);
+    }
 
-        if (!filtered.isEmpty())
-            filtered.apply();
+    void apply()
+    {
+        try
+        {
+            applyFuture().get();
+        }
+        catch (Exception e)
+        {
+            throw Throwables.propagate(e.getCause());
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/48abc036/src/java/org/apache/cassandra/hints/HintVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintVerbHandler.java b/src/java/org/apache/cassandra/hints/HintVerbHandler.java
index d8838a9..abcd1f9 100644
--- a/src/java/org/apache/cassandra/hints/HintVerbHandler.java
+++ b/src/java/org/apache/cassandra/hints/HintVerbHandler.java
@@ -88,10 +88,8 @@ public final class HintVerbHandler implements IVerbHandler<HintMessage>
         else
         {
             // the common path - the node is both the destination and a valid replica for the hint.
-            hint.apply();
+            hint.applyFuture().thenAccept(o -> reply(id, message.from)).exceptionally(e -> {logger.debug("Failed to apply hint", e); return null;});
         }
-
-        reply(id, message.from);
     }
 
     private static void reply(int id, InetAddress to)


[3/4] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by pa...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/489be961
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/489be961
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/489be961

Branch: refs/heads/cassandra-3.X
Commit: 489be961c945e4330a9426d21b2bb903cc1d3a54
Parents: 73547a3 48abc03
Author: Paulo Motta <pa...@apache.org>
Authored: Thu Dec 15 16:46:32 2016 -0200
Committer: Paulo Motta <pa...@apache.org>
Committed: Thu Dec 15 16:49:10 2016 -0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/db/Keyspace.java  | 89 +++++++++++---------
 src/java/org/apache/cassandra/db/Mutation.java  | 17 ++--
 .../db/commitlog/CommitLogReplayer.java         | 10 +--
 src/java/org/apache/cassandra/hints/Hint.java   | 40 ++++++---
 .../apache/cassandra/hints/HintVerbHandler.java |  4 +-
 .../cassandra/service/paxos/PaxosState.java     |  9 +-
 .../cassandra/streaming/StreamReceiveTask.java  |  6 +-
 8 files changed, 90 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/489be961/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 3db0179,63e095d..fa0c94a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,114 -1,5 +1,115 @@@
 -3.0.11
 +3.10
 + * Remove outboundBindAny configuration property (CASSANDRA-12673)
 + * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
 + * Remove timing window in test case (CASSANDRA-12875)
 + * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945)
 + * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919)
 + * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
 + * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
 + * Fix Murmur3PartitionerTest (CASSANDRA-12858)
 + * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897)
 + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
 + * Fix cassandra-stress truncate option (CASSANDRA-12695)
 + * Fix crossNode value when receiving messages (CASSANDRA-12791)
 + * Don't load MX4J beans twice (CASSANDRA-12869)
 + * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838)
 + * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
 + * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
 + * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454)
 + * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
 + * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419)
 + * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
 + * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
 + * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815)
 + * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812)
 + * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
 + * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550)
 + * Add duration data type (CASSANDRA-11873)
 + * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
 + * Improve sum aggregate functions (CASSANDRA-12417)
 + * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761)
 + * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
 + * Check for hash conflicts in prepared statements (CASSANDRA-12733)
 + * Exit query parsing upon first error (CASSANDRA-12598)
 + * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729)
 + * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
 + * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199)
 + * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
 + * Add hint delivery metrics (CASSANDRA-12693)
 + * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
 + * ColumnIndex does not reuse buffer (CASSANDRA-12502)
 + * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
 + * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
 + * Tune compaction thread count via nodetool (CASSANDRA-12248)
 + * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
 + * Include repair session IDs in repair start message (CASSANDRA-12532)
 + * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
 + * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
 + * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
 + * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
 + * Fix cassandra-stress graphing (CASSANDRA-12237)
 + * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
 + * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
 + * Add JMH benchmarks.jar (CASSANDRA-12586)
 + * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
 + * Add keep-alive to streaming (CASSANDRA-11841)
 + * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
 + * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261)
 + * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
 + * Retry all internode messages once after a connection is
 +   closed and reopened (CASSANDRA-12192)
 + * Add support to rebuild from targeted replica (CASSANDRA-9875)
 + * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
 + * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
 + * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
 + * Extend read/write failure messages with a map of replica addresses
 +   to error codes in the v5 native protocol (CASSANDRA-12311)
 + * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
 + * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550)
 + * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378)
 + * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
 + * Added slow query log (CASSANDRA-12403)
 + * Count full coordinated request against timeout (CASSANDRA-12256)
 + * Allow TTL with null value on insert and update (CASSANDRA-12216)
 + * Make decommission operation resumable (CASSANDRA-12008)
 + * Add support to one-way targeted repair (CASSANDRA-9876)
 + * Remove clientutil jar (CASSANDRA-11635)
 + * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
 + * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)
 + * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
 + * Make it possible to compact a given token range (CASSANDRA-10643)
 + * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
 + * Collect metrics on queries by consistency level (CASSANDRA-7384)
 + * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
 + * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
 + * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
 + * Add version command to cassandra-stress (CASSANDRA-12258)
 + * Create compaction-stress tool (CASSANDRA-11844)
 + * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
 + * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
 + * Support filtering on non-PRIMARY KEY columns in the CREATE
 +   MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
 + * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
 + * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
 + * Faster write path (CASSANDRA-12269)
 + * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
 + * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
 + * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635)
 + * Prepend snapshot name with "truncated" or "dropped" when a snapshot
 +   is taken before truncating or dropping a table (CASSANDRA-12178)
 + * Optimize RestrictionSet (CASSANDRA-12153)
 + * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
 + * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
 + * Create a system table to expose prepared statements (CASSANDRA-8831)
 + * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
 + * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
 + * Add supplied username to authentication error messages (CASSANDRA-12076)
 + * Remove pre-startup check for open JMX port (CASSANDRA-12074)
 + * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
 + * Restore resumable hints delivery (CASSANDRA-11960)
 + * Properly report LWT contention (CASSANDRA-12626)
 +Merged from 3.0:
+  * Improve contention handling on failure to acquire MV lock for streaming and hints (CASSANDRA-12905)
   * Fix DELETE and UPDATE queries with empty IN restrictions (CASSANDRA-12829)
   * Mark MVs as built after successful bootstrap (CASSANDRA-12984)
   * Estimated TS drop-time histogram updated with Cell.NO_DELETION_TIME (CASSANDRA-13040)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/489be961/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Keyspace.java
index bd58f75,3715995..d9f8f62
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@@ -478,63 -435,35 +476,67 @@@ public class Keyspac
          if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS))
              throw new RuntimeException("Testing write failures");
  
 +        Lock[] locks = null;
 +
          boolean requiresViewUpdate = updateIndexes && viewManager.updatesAffectView(Collections.singleton(mutation), false);
-         final CompletableFuture<?> mark = future == null ? new CompletableFuture<>() : future;
+ 
+         // If apply is not deferrable, no future is required, returns always null
+         if (isDeferrable && future == null) {
+             future = new CompletableFuture<>();
+         }
  
 -        Lock lock = null;
          if (requiresViewUpdate)
          {
              mutation.viewLockAcquireStart.compareAndSet(0L, System.currentTimeMillis());
 -            while (true)
 -            {
 -                if (TEST_FAIL_MV_LOCKS_COUNT == 0)
 -                    lock = ViewManager.acquireLockFor(mutation.key().getKey());
 -                else
 -                    TEST_FAIL_MV_LOCKS_COUNT--;
  
 -                if (lock == null)
 +            // the order of lock acquisition doesn't matter (from a deadlock perspective) because we only use tryLock()
 +            Collection<UUID> columnFamilyIds = mutation.getColumnFamilyIds();
 +            Iterator<UUID> idIterator = columnFamilyIds.iterator();
 +
 +            locks = new Lock[columnFamilyIds.size()];
 +            for (int i = 0; i < columnFamilyIds.size(); i++)
 +            {
 +                UUID cfid = idIterator.next();
 +                int lockKey = Objects.hash(mutation.key().getKey(), cfid);
 +                while (true)
                  {
 -                    //throw WTE only if request is droppable
 -                    if (isDroppable && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
 +                    Lock lock = null;
 +
 +                    if (TEST_FAIL_MV_LOCKS_COUNT == 0)
 +                        lock = ViewManager.acquireLockFor(lockKey);
 +                    else
 +                        TEST_FAIL_MV_LOCKS_COUNT--;
 +
 +                    if (lock == null)
                      {
-                         // avoid throwing a WTE during commitlog replay
-                         if (!isClReplay && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
 -                        logger.trace("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()));
 -                        Tracing.trace("Could not acquire MV lock");
 -                        if (future != null)
++                        //throw WTE only if request is droppable
++                        if (isDroppable && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
                          {
 -                            future.completeExceptionally(new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1));
 +                            for (int j = 0; j < i; j++)
 +                                locks[j].unlock();
 +
 +                            logger.trace("Could not acquire lock for {} and table {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()), columnFamilyStores.get(cfid).name);
 +                            Tracing.trace("Could not acquire MV lock");
 +                            if (future != null)
 +                            {
 +                                future.completeExceptionally(new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1));
-                                 return mark;
++                                return future;
 +                            }
 +                            else
 +                                throw new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1);
 +                        }
 +                        else if (isDeferrable)
 +                        {
 +                            for (int j = 0; j < i; j++)
 +                                locks[j].unlock();
 +
 +                            // This view update can't happen right now. so rather than keep this thread busy
 +                            // we will re-apply ourself to the queue and try again later
++                            final CompletableFuture<?> mark = future;
 +                            StageManager.getStage(Stage.MUTATION).execute(() ->
-                                                                           apply(mutation, writeCommitLog, true, isClReplay, mark)
++                                                                          apply(mutation, writeCommitLog, true, isDroppable, true, mark)
 +                            );
- 
-                             return mark;
+                             return future;
                          }
                          else
                          {
@@@ -561,13 -512,6 +563,15 @@@
                      break;
                  }
              }
 +
 +            long acquireTime = System.currentTimeMillis() - mutation.viewLockAcquireStart.get();
-             if (!isClReplay)
++            // Metrics are only collected for droppable write operations
++            // Bulk non-droppable operations (e.g. commitlog replay, hint delivery) are not measured
++            if (isDroppable)
 +            {
 +                for(UUID cfid : columnFamilyIds)
 +                    columnFamilyStores.get(cfid).metric.viewLockAcquireTime.update(acquireTime, TimeUnit.MILLISECONDS);
 +            }
          }
          int nowInSec = FBUtilities.nowInSeconds();
          try (OpOrder.Group opGroup = writeOrder.start())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/489be961/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/489be961/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index eeb9bc8,d53f0f8..4d2971f
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -175,105 -208,38 +175,97 @@@ public class CommitLogReplayer implemen
          return replayedCount.get();
      }
  
 -    private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader, boolean tolerateTruncation) throws IOException
 +    /*
 +     * Wrapper around initiating mutations read from the log to make it possible
 +     * to spy on initiated mutations for test
 +     */
 +    @VisibleForTesting
 +    public static class MutationInitiator
      {
 -        if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE)
 -        {
 -            // There was no room in the segment to write a final header. No data could be present here.
 -            return -1;
 -        }
 -        reader.seek(offset);
 -        CRC32 crc = new CRC32();
 -        updateChecksumInt(crc, (int) (descriptor.id & 0xFFFFFFFFL));
 -        updateChecksumInt(crc, (int) (descriptor.id >>> 32));
 -        updateChecksumInt(crc, (int) reader.getPosition());
 -        int end = reader.readInt();
 -        long filecrc = reader.readInt() & 0xffffffffL;
 -        if (crc.getValue() != filecrc)
 +        protected Future<Integer> initiateMutation(final Mutation mutation,
 +                                                   final long segmentId,
 +                                                   final int serializedSize,
 +                                                   final int entryLocation,
 +                                                   final CommitLogReplayer commitLogReplayer)
          {
 -            if (end != 0 || filecrc != 0)
 +            Runnable runnable = new WrappedRunnable()
              {
 -                handleReplayError(false,
 -                                  "Encountered bad header at position %d of commit log %s, with invalid CRC. " +
 -                                  "The end of segment marker should be zero.",
 -                                  offset, reader.getPath());
 -            }
 -            return -1;
 -        }
 -        else if (end < offset || end > reader.length())
 -        {
 -            handleReplayError(tolerateTruncation, "Encountered bad header at position %d of commit log %s, with bad position but valid CRC",
 -                              offset, reader.getPath());
 -            return -1;
 +                public void runMayThrow()
 +                {
 +                    if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
 +                        return;
 +                    if (commitLogReplayer.pointInTimeExceeded(mutation))
 +                        return;
 +
 +                    final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
 +
 +                    // Rebuild the mutation, omitting column families that
 +                    //    a) the user has requested that we ignore,
 +                    //    b) have already been flushed,
 +                    // or c) are part of a cf that was dropped.
 +                    // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
 +                    Mutation newMutation = null;
 +                    for (PartitionUpdate update : commitLogReplayer.replayFilter.filter(mutation))
 +                    {
 +                        if (Schema.instance.getCF(update.metadata().cfId) == null)
 +                            continue; // dropped
 +
 +                        // replay if current segment is newer than last flushed one or,
 +                        // if it is the last known segment, if we are after the commit log segment position
 +                        if (commitLogReplayer.shouldReplay(update.metadata().cfId, new CommitLogPosition(segmentId, entryLocation)))
 +                        {
 +                            if (newMutation == null)
 +                                newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
 +                            newMutation.add(update);
 +                            commitLogReplayer.replayedCount.incrementAndGet();
 +                        }
 +                    }
 +                    if (newMutation != null)
 +                    {
 +                        assert !newMutation.isEmpty();
 +
-                         try
-                         {
-                             Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation));
-                         }
-                         catch (ExecutionException e)
-                         {
-                             throw Throwables.propagate(e.getCause());
-                         }
- 
++                        Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false, true, false);
 +                        commitLogReplayer.keyspacesReplayed.add(keyspace);
 +                    }
 +                }
 +            };
 +            return StageManager.getStage(Stage.MUTATION).submit(runnable, serializedSize);
          }
 -        return end;
 +    }
 +
 +    /**
 +     * A set of known safe-to-discard commit log replay positions, based on
 +     * the range covered by on disk sstables and those prior to the most recent truncation record
 +     */
 +    public static IntervalSet<CommitLogPosition> persistedIntervals(Iterable<SSTableReader> onDisk, CommitLogPosition truncatedAt)
 +    {
 +        IntervalSet.Builder<CommitLogPosition> builder = new IntervalSet.Builder<>();
 +        for (SSTableReader reader : onDisk)
 +            builder.addAll(reader.getSSTableMetadata().commitLogIntervals);
 +
 +        if (truncatedAt != null)
 +            builder.add(CommitLogPosition.NONE, truncatedAt);
 +        return builder.build();
 +    }
 +
 +    /**
 +     * Find the earliest commit log position that is not covered by the known flushed ranges for some table.
 +     *
 +     * For efficiency this assumes that the first contiguously flushed interval we know of contains the moment that the
 +     * given table was constructed* and hence we can start replay from the end of that interval.
 +     *
 +     * If such an interval is not known, we must replay from the beginning.
 +     *
 +     * * This is not true only until if the very first flush of a table stalled or failed, while the second or latter
 +     *   succeeded. The chances of this happening are at most very low, and if the assumption does prove to be
 +     *   incorrect during replay there is little chance that the affected deployment is in production.
 +     */
 +    public static CommitLogPosition firstNotCovered(Collection<IntervalSet<CommitLogPosition>> ranges)
 +    {
 +        return ranges.stream()
 +                .map(intervals -> Iterables.getFirst(intervals.ends(), CommitLogPosition.NONE))
 +                .min(Ordering.natural())
 +                .get(); // iteration is per known-CF, there must be at least one.
      }
  
      abstract static class ReplayFilter

http://git-wip-us.apache.org/repos/asf/cassandra/blob/489be961/src/java/org/apache/cassandra/hints/HintVerbHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/489be961/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 8fe5a49,b6b8387..6c60b74
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -192,17 -185,8 +193,14 @@@ public class StreamReceiveTask extends 
                                  {
                                      try (UnfilteredRowIterator rowIterator = scanner.next())
                                      {
 -                                        // MV *can* be applied unsafe as we flush below before transaction is done.
 -                                        ks.apply(new Mutation(PartitionUpdate.fromIterator(rowIterator)), false, true, false);
 +                                        Mutation m = new Mutation(PartitionUpdate.fromIterator(rowIterator, ColumnFilter.all(cfs.metadata)));
 +
 +                                        // MV *can* be applied unsafe if there's no CDC on the CFS as we flush below
 +                                        // before transaction is done.
 +                                        //
 +                                        // If the CFS has CDC, however, these updates need to be written to the CommitLog
 +                                        // so they get archived into the cdc_raw folder
-                                         if (hasCDC)
-                                             m.apply();
-                                         else
-                                             m.applyUnsafe();
++                                        ks.apply(m, hasCDC, true, false);
                                      }
                                  }
                              }


[4/4] cassandra git commit: Merge branch 'cassandra-3.11' into cassandra-3.X

Posted by pa...@apache.org.
Merge branch 'cassandra-3.11' into cassandra-3.X


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b1e23ab5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b1e23ab5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b1e23ab5

Branch: refs/heads/cassandra-3.X
Commit: b1e23ab54f659755fdd7c1adf5c14b05088337f6
Parents: e5a77cf 489be96
Author: Paulo Motta <pa...@apache.org>
Authored: Thu Dec 15 16:50:30 2016 -0200
Committer: Paulo Motta <pa...@apache.org>
Committed: Thu Dec 15 16:50:30 2016 -0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/db/Keyspace.java  | 89 +++++++++++---------
 src/java/org/apache/cassandra/db/Mutation.java  | 17 ++--
 .../db/commitlog/CommitLogReplayer.java         | 10 +--
 src/java/org/apache/cassandra/hints/Hint.java   | 40 ++++++---
 .../apache/cassandra/hints/HintVerbHandler.java |  4 +-
 .../cassandra/service/paxos/PaxosState.java     |  9 +-
 .../cassandra/streaming/StreamReceiveTask.java  |  6 +-
 8 files changed, 90 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1e23ab5/CHANGES.txt
----------------------------------------------------------------------