You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/12/04 17:50:41 UTC

cassandra git commit: bound maximum in-flight commit log replay mutation bytes to 64 megabytes (tunable via cassandra.commitlog_max_outstanding_replay_bytes)

Repository: cassandra
Updated Branches:
  refs/heads/trunk 52d5eb04f -> 1c41a9ac2


bound maximum in-flight commit log replay mutation bytes to 64 megabytes (tunable via cassandra.commitlog_max_outstanding_replay_bytes)

Patch by Ariel Weisberg; reviewed by tjake for CASSANDRA-8639


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1c41a9ac
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1c41a9ac
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1c41a9ac

Branch: refs/heads/trunk
Commit: 1c41a9ac2c147ed111d9d8fba53652707dac7df0
Parents: 52d5eb0
Author: Ariel Weisberg <ar...@datastax.com>
Authored: Tue Nov 24 15:17:10 2015 -0500
Committer: T Jake Luciani <ja...@apache.org>
Committed: Fri Dec 4 11:49:41 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   3 +-
 .../db/commitlog/CommitLogReplayer.java         | 131 +++++++++++-------
 .../cassandra/db/RecoveryManagerTest.java       | 137 +++++++++++++++++++
 4 files changed, 222 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c41a9ac/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1607a66..6c46183 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.2
+ * bound maximum in-flight commit log replay mutation bytes to 64 megabytes (CASSANDRA-8639)
  * Normalize all scripts (CASSANDRA-10679)
  * Make compression ratio much more accurate (CASSANDRA-10225)
  * Optimize building of Clustering object when only one is created (CASSANDRA-10409)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c41a9ac/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 2a5970d..8830c99 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -18,7 +18,8 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
-
+   - bound maximum in-flight commit log replay mutation bytes to 64 megabytes
+     tunable via cassandra.commitlog_max_outstanding_replay_bytes
    - Support for type casting has been added to the selection clause.
 
 Upgrading

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c41a9ac/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 2668bba..5010696 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -29,6 +29,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.CRC32;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
@@ -54,7 +55,6 @@ import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.NIODataInputStream;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -65,13 +65,17 @@ import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
 
 public class CommitLogReplayer
 {
+    @VisibleForTesting
+    public static long MAX_OUTSTANDING_REPLAY_BYTES = Long.getLong("cassandra.commitlog_max_outstanding_replay_bytes", 1024 * 1024 * 64);
+    @VisibleForTesting
+    public static MutationInitiator mutationInitiator = new MutationInitiator();
     static final String IGNORE_REPLAY_ERRORS_PROPERTY = "cassandra.commitlog.ignorereplayerrors";
     private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
     private static final int MAX_OUTSTANDING_REPLAY_COUNT = Integer.getInteger("cassandra.commitlog_max_outstanding_replay_count", 1024);
     private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
 
     private final Set<Keyspace> keyspacesRecovered;
-    private final List<Future<?>> futures;
+    private final Queue<Future<Integer>> futures;
     private final Map<UUID, AtomicInteger> invalidMutations;
     private final AtomicInteger replayedCount;
     private final Map<UUID, ReplayPosition> cfPositions;
@@ -79,14 +83,74 @@ public class CommitLogReplayer
     private final CRC32 checksum;
     private byte[] buffer;
     private byte[] uncompressedBuffer;
+    private long pendingMutationBytes = 0;
 
     private final ReplayFilter replayFilter;
     private final CommitLogArchiver archiver;
 
+    /*
+     * Wrapper around initiating mutations read from the log to make it possible
+     * to spy on initiated mutations for test
+     */
+    @VisibleForTesting
+    public static class MutationInitiator
+    {
+        protected Future<Integer> initiateMutation(final Mutation mutation,
+                                                   final long segmentId,
+                                                   final int serializedSize,
+                                                   final long entryLocation,
+                                                   final CommitLogReplayer clr)
+        {
+            Runnable runnable = new WrappedRunnable()
+            {
+                public void runMayThrow() throws IOException
+                {
+                    if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
+                        return;
+                    if (clr.pointInTimeExceeded(mutation))
+                        return;
+
+                    final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
+
+                    // Rebuild the mutation, omitting column families that
+                    //    a) the user has requested that we ignore,
+                    //    b) have already been flushed,
+                    // or c) are part of a cf that was dropped.
+                    // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
+                    Mutation newMutation = null;
+                    for (PartitionUpdate update : clr.replayFilter.filter(mutation))
+                    {
+                        if (Schema.instance.getCF(update.metadata().cfId) == null)
+                            continue; // dropped
+
+                        ReplayPosition rp = clr.cfPositions.get(update.metadata().cfId);
+
+                        // replay if current segment is newer than last flushed one or,
+                        // if it is the last known segment, if we are after the replay position
+                        if (segmentId > rp.segment || (segmentId == rp.segment && entryLocation > rp.position))
+                        {
+                            if (newMutation == null)
+                                newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
+                            newMutation.add(update);
+                            clr.replayedCount.incrementAndGet();
+                        }
+                    }
+                    if (newMutation != null)
+                    {
+                        assert !newMutation.isEmpty();
+                        Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation);
+                        clr.keyspacesRecovered.add(keyspace);
+                    }
+                }
+            };
+            return StageManager.getStage(Stage.MUTATION).submit(runnable, serializedSize);
+        }
+    }
+
     CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, ReplayPosition> cfPositions, ReplayFilter replayFilter)
     {
         this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();
-        this.futures = new ArrayList<Future<?>>();
+        this.futures = new ArrayDeque<Future<Integer>>();
         this.buffer = new byte[4096];
         this.uncompressedBuffer = new byte[4096];
         this.invalidMutations = new HashMap<UUID, AtomicInteger>();
@@ -163,6 +227,8 @@ public class CommitLogReplayer
         // flush replayed keyspaces
         futures.clear();
         boolean flushingSystem = false;
+
+        List<Future<?>> futures = new ArrayList<Future<?>>();
         for (Keyspace keyspace : keyspacesRecovered)
         {
             if (keyspace.getName().equals(SystemKeyspace.NAME))
@@ -176,6 +242,7 @@ public class CommitLogReplayer
             futures.add(Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceFlush());
 
         FBUtilities.waitOnFutures(futures);
+
         return replayedCount.get();
     }
 
@@ -565,53 +632,19 @@ public class CommitLogReplayer
         if (logger.isTraceEnabled())
             logger.trace("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), mutation.key(), "{" + StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}");
 
-        Runnable runnable = new WrappedRunnable()
-        {
-            public void runMayThrow() throws IOException
-            {
-                if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
-                    return;
-                if (pointInTimeExceeded(mutation))
-                    return;
-
-                final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
-
-                // Rebuild the mutation, omitting column families that
-                //    a) the user has requested that we ignore,
-                //    b) have already been flushed,
-                // or c) are part of a cf that was dropped.
-                // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
-                Mutation newMutation = null;
-                for (PartitionUpdate update : replayFilter.filter(mutation))
-                {
-                    if (Schema.instance.getCF(update.metadata().cfId) == null)
-                        continue; // dropped
-
-                    ReplayPosition rp = cfPositions.get(update.metadata().cfId);
-
-                    // replay if current segment is newer than last flushed one or,
-                    // if it is the last known segment, if we are after the replay position
-                    if (desc.id > rp.segment || (desc.id == rp.segment && entryLocation > rp.position))
-                    {
-                        if (newMutation == null)
-                            newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
-                        newMutation.add(update);
-                        replayedCount.incrementAndGet();
-                    }
-                }
-                if (newMutation != null)
-                {
-                    assert !newMutation.isEmpty();
-                    Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation);
-                    keyspacesRecovered.add(keyspace);
-                }
-            }
-        };
-        futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
-        if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
+        pendingMutationBytes += size;
+        futures.offer(mutationInitiator.initiateMutation(mutation,
+                                                         desc.id,
+                                                         size,
+                                                         entryLocation,
+                                                         this));
+        //If there are finished mutations, or too many outstanding bytes/mutations
+        //drain the futures in the queue
+        while (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT
+                || pendingMutationBytes > MAX_OUTSTANDING_REPLAY_BYTES
+                || (!futures.isEmpty() && futures.peek().isDone()))
         {
-            FBUtilities.waitOnFutures(futures);
-            futures.clear();
+            pendingMutationBytes -= FBUtilities.waitOnFuture(futures.poll());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c41a9ac/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
index baf9466..788757c 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
@@ -20,7 +20,12 @@ package org.apache.cassandra.db;
 
 import java.io.IOException;
 import java.util.Date;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,11 +50,71 @@ import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.CommitLogArchiver;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.db.commitlog.CommitLogReplayer;
 
 @RunWith(OrderedJUnit4ClassRunner.class)
 public class RecoveryManagerTest
 {
     private static Logger logger = LoggerFactory.getLogger(RecoveryManagerTest.class);
+    static final Semaphore blocker = new Semaphore(0);
+    static final Semaphore blocked = new Semaphore(0);
+    static CommitLogReplayer.MutationInitiator originalInitiator = null;
+    static final CommitLogReplayer.MutationInitiator mockInitiator = new CommitLogReplayer.MutationInitiator()
+    {
+        @Override
+        protected Future<Integer> initiateMutation(final Mutation mutation,
+                final long segmentId,
+                final int serializedSize,
+                final long entryLocation,
+                final CommitLogReplayer clr)
+        {
+            final Future<Integer> toWrap = super.initiateMutation(mutation,
+                                                                  segmentId,
+                                                                  serializedSize,
+                                                                  entryLocation,
+                                                                  clr);
+            return new Future<Integer>()
+            {
+
+                @Override
+                public boolean cancel(boolean mayInterruptIfRunning)
+                {
+                    throw new UnsupportedOperationException();
+                }
+
+                @Override
+                public boolean isCancelled()
+                {
+                    throw new UnsupportedOperationException();
+                }
+
+                @Override
+                public boolean isDone()
+                {
+                    return blocker.availablePermits() > 0 && toWrap.isDone();
+                }
+
+                @Override
+                public Integer get() throws InterruptedException, ExecutionException
+                {
+                    System.out.println("Got blocker once");
+                    blocked.release();
+                    blocker.acquire();
+                    return toWrap.get();
+                }
+
+                @Override
+                public Integer get(long timeout, TimeUnit unit)
+                        throws InterruptedException, ExecutionException, TimeoutException
+                {
+                    blocked.release();
+                    blocker.tryAcquire(1, timeout, unit);
+                    return toWrap.get(timeout, unit);
+                }
+
+            };
+        }
+    };
 
     private static final String KEYSPACE1 = "RecoveryManagerTest1";
     private static final String CF_STANDARD1 = "Standard1";
@@ -86,6 +151,78 @@ public class RecoveryManagerTest
     }
 
     @Test
+    public void testRecoverBlocksOnBytesOutstanding() throws Exception
+    {
+        long originalMaxOutstanding = CommitLogReplayer.MAX_OUTSTANDING_REPLAY_BYTES;
+        CommitLogReplayer.MAX_OUTSTANDING_REPLAY_BYTES = 1;
+        CommitLogReplayer.MutationInitiator originalInitiator = CommitLogReplayer.mutationInitiator;
+        CommitLogReplayer.mutationInitiator = mockInitiator;
+        try
+        {
+            CommitLog.instance.resetUnsafe(true);
+            Keyspace keyspace1 = Keyspace.open(KEYSPACE1);
+            Keyspace keyspace2 = Keyspace.open(KEYSPACE2);
+
+            UnfilteredRowIterator upd1 = Util.apply(new RowUpdateBuilder(keyspace1.getColumnFamilyStore(CF_STANDARD1).metadata, 1L, 0, "keymulti")
+                .clustering("col1").add("val", "1")
+                .build());
+
+            UnfilteredRowIterator upd2 = Util.apply(new RowUpdateBuilder(keyspace2.getColumnFamilyStore(CF_STANDARD3).metadata, 1L, 0, "keymulti")
+                                           .clustering("col2").add("val", "1")
+                                           .build());
+
+            keyspace1.getColumnFamilyStore("Standard1").clearUnsafe();
+            keyspace2.getColumnFamilyStore("Standard3").clearUnsafe();
+
+            DecoratedKey dk = Util.dk("keymulti");
+            Assert.assertTrue(Util.getAllUnfiltered(Util.cmd(keyspace1.getColumnFamilyStore(CF_STANDARD1), dk).build()).isEmpty());
+            Assert.assertTrue(Util.getAllUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).isEmpty());
+
+            final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
+            Thread t = new Thread() {
+                @Override
+                public void run()
+                {
+                    try
+                    {
+                        CommitLog.instance.resetUnsafe(false); // disassociate segments from live CL
+                    }
+                    catch (Throwable t)
+                    {
+                        err.set(t);
+                    }
+                }
+            };
+            t.start();
+            Assert.assertTrue(blocked.tryAcquire(1, 20, TimeUnit.SECONDS));
+            Thread.sleep(100);
+            Assert.assertTrue(t.isAlive());
+            blocker.release(Integer.MAX_VALUE);
+            t.join(20 * 1000);
+
+            if (err.get() != null)
+                throw new RuntimeException(err.get());
+
+            if (t.isAlive())
+            {
+                Throwable toPrint = new Throwable();
+                toPrint.setStackTrace(Thread.getAllStackTraces().get(t));
+                toPrint.printStackTrace(System.out);
+            }
+            Assert.assertFalse(t.isAlive());
+
+            Assert.assertTrue(Util.equal(upd1, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace1.getColumnFamilyStore(CF_STANDARD1), dk).build()).unfilteredIterator()));
+            Assert.assertTrue(Util.equal(upd2, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).unfilteredIterator()));
+        }
+        finally
+        {
+            CommitLogReplayer.mutationInitiator = originalInitiator;
+            CommitLogReplayer.MAX_OUTSTANDING_REPLAY_BYTES = originalMaxOutstanding;
+        }
+    }
+
+
+    @Test
     public void testOne() throws IOException
     {
         CommitLog.instance.resetUnsafe(true);