You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/12/04 17:50:41 UTC
cassandra git commit: bound maximum in-flight commit log replay
mutation bytes to 64 megabytes (tunable via
cassandra.commitlog_max_outstanding_replay_bytes)
Repository: cassandra
Updated Branches:
refs/heads/trunk 52d5eb04f -> 1c41a9ac2
bound maximum in-flight commit log replay mutation bytes to 64 megabytes (tunable via cassandra.commitlog_max_outstanding_replay_bytes)
Patch by Ariel Weisberg; reviewed by tjake for CASSANDRA-8639
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1c41a9ac
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1c41a9ac
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1c41a9ac
Branch: refs/heads/trunk
Commit: 1c41a9ac2c147ed111d9d8fba53652707dac7df0
Parents: 52d5eb0
Author: Ariel Weisberg <ar...@datastax.com>
Authored: Tue Nov 24 15:17:10 2015 -0500
Committer: T Jake Luciani <ja...@apache.org>
Committed: Fri Dec 4 11:49:41 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 3 +-
.../db/commitlog/CommitLogReplayer.java | 131 +++++++++++-------
.../cassandra/db/RecoveryManagerTest.java | 137 +++++++++++++++++++
4 files changed, 222 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c41a9ac/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1607a66..6c46183 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.2
+ * bound maximum in-flight commit log replay mutation bytes to 64 megabytes (CASSANDRA-8639)
* Normalize all scripts (CASSANDRA-10679)
* Make compression ratio much more accurate (CASSANDRA-10225)
* Optimize building of Clustering object when only one is created (CASSANDRA-10409)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c41a9ac/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 2a5970d..8830c99 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -18,7 +18,8 @@ using the provided 'sstableupgrade' tool.
New features
------------
-
+ - bound maximum in-flight commit log replay mutation bytes to 64 megabytes
+ tunable via cassandra.commitlog_max_outstanding_replay_bytes
- Support for type casting has been added to the selection clause.
Upgrading
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c41a9ac/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 2668bba..5010696 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -29,6 +29,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.CRC32;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
@@ -54,7 +55,6 @@ import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.NIODataInputStream;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -65,13 +65,17 @@ import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
public class CommitLogReplayer
{
+ @VisibleForTesting
+ public static long MAX_OUTSTANDING_REPLAY_BYTES = Long.getLong("cassandra.commitlog_max_outstanding_replay_bytes", 1024 * 1024 * 64);
+ @VisibleForTesting
+ public static MutationInitiator mutationInitiator = new MutationInitiator();
static final String IGNORE_REPLAY_ERRORS_PROPERTY = "cassandra.commitlog.ignorereplayerrors";
private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
private static final int MAX_OUTSTANDING_REPLAY_COUNT = Integer.getInteger("cassandra.commitlog_max_outstanding_replay_count", 1024);
private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
private final Set<Keyspace> keyspacesRecovered;
- private final List<Future<?>> futures;
+ private final Queue<Future<Integer>> futures;
private final Map<UUID, AtomicInteger> invalidMutations;
private final AtomicInteger replayedCount;
private final Map<UUID, ReplayPosition> cfPositions;
@@ -79,14 +83,74 @@ public class CommitLogReplayer
private final CRC32 checksum;
private byte[] buffer;
private byte[] uncompressedBuffer;
+ private long pendingMutationBytes = 0;
private final ReplayFilter replayFilter;
private final CommitLogArchiver archiver;
+ /*
+ * Wrapper around initiating mutations read from the log to make it possible
+ * to spy on initiated mutations for test
+ */
+ @VisibleForTesting
+ public static class MutationInitiator
+ {
+ protected Future<Integer> initiateMutation(final Mutation mutation,
+ final long segmentId,
+ final int serializedSize,
+ final long entryLocation,
+ final CommitLogReplayer clr)
+ {
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws IOException
+ {
+ if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
+ return;
+ if (clr.pointInTimeExceeded(mutation))
+ return;
+
+ final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
+
+ // Rebuild the mutation, omitting column families that
+ // a) the user has requested that we ignore,
+ // b) have already been flushed,
+ // or c) are part of a cf that was dropped.
+ // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
+ Mutation newMutation = null;
+ for (PartitionUpdate update : clr.replayFilter.filter(mutation))
+ {
+ if (Schema.instance.getCF(update.metadata().cfId) == null)
+ continue; // dropped
+
+ ReplayPosition rp = clr.cfPositions.get(update.metadata().cfId);
+
+ // replay if current segment is newer than last flushed one or,
+ // if it is the last known segment, if we are after the replay position
+ if (segmentId > rp.segment || (segmentId == rp.segment && entryLocation > rp.position))
+ {
+ if (newMutation == null)
+ newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
+ newMutation.add(update);
+ clr.replayedCount.incrementAndGet();
+ }
+ }
+ if (newMutation != null)
+ {
+ assert !newMutation.isEmpty();
+ Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation);
+ clr.keyspacesRecovered.add(keyspace);
+ }
+ }
+ };
+ return StageManager.getStage(Stage.MUTATION).submit(runnable, serializedSize);
+ }
+ }
+
CommitLogReplayer(CommitLog commitLog, ReplayPosition globalPosition, Map<UUID, ReplayPosition> cfPositions, ReplayFilter replayFilter)
{
this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();
- this.futures = new ArrayList<Future<?>>();
+ this.futures = new ArrayDeque<Future<Integer>>();
this.buffer = new byte[4096];
this.uncompressedBuffer = new byte[4096];
this.invalidMutations = new HashMap<UUID, AtomicInteger>();
@@ -163,6 +227,8 @@ public class CommitLogReplayer
// flush replayed keyspaces
futures.clear();
boolean flushingSystem = false;
+
+ List<Future<?>> futures = new ArrayList<Future<?>>();
for (Keyspace keyspace : keyspacesRecovered)
{
if (keyspace.getName().equals(SystemKeyspace.NAME))
@@ -176,6 +242,7 @@ public class CommitLogReplayer
futures.add(Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceFlush());
FBUtilities.waitOnFutures(futures);
+
return replayedCount.get();
}
@@ -565,53 +632,19 @@ public class CommitLogReplayer
if (logger.isTraceEnabled())
logger.trace("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), mutation.key(), "{" + StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}");
- Runnable runnable = new WrappedRunnable()
- {
- public void runMayThrow() throws IOException
- {
- if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
- return;
- if (pointInTimeExceeded(mutation))
- return;
-
- final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
-
- // Rebuild the mutation, omitting column families that
- // a) the user has requested that we ignore,
- // b) have already been flushed,
- // or c) are part of a cf that was dropped.
- // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
- Mutation newMutation = null;
- for (PartitionUpdate update : replayFilter.filter(mutation))
- {
- if (Schema.instance.getCF(update.metadata().cfId) == null)
- continue; // dropped
-
- ReplayPosition rp = cfPositions.get(update.metadata().cfId);
-
- // replay if current segment is newer than last flushed one or,
- // if it is the last known segment, if we are after the replay position
- if (desc.id > rp.segment || (desc.id == rp.segment && entryLocation > rp.position))
- {
- if (newMutation == null)
- newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
- newMutation.add(update);
- replayedCount.incrementAndGet();
- }
- }
- if (newMutation != null)
- {
- assert !newMutation.isEmpty();
- Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation);
- keyspacesRecovered.add(keyspace);
- }
- }
- };
- futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
- if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
+ pendingMutationBytes += size;
+ futures.offer(mutationInitiator.initiateMutation(mutation,
+ desc.id,
+ size,
+ entryLocation,
+ this));
+ //If there are finished mutations, or too many outstanding bytes/mutations
+ //drain the futures in the queue
+ while (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT
+ || pendingMutationBytes > MAX_OUTSTANDING_REPLAY_BYTES
+ || (!futures.isEmpty() && futures.peek().isDone()))
{
- FBUtilities.waitOnFutures(futures);
- futures.clear();
+ pendingMutationBytes -= FBUtilities.waitOnFuture(futures.poll());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c41a9ac/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
index baf9466..788757c 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
@@ -20,7 +20,12 @@ package org.apache.cassandra.db;
import java.io.IOException;
import java.util.Date;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,11 +50,71 @@ import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogArchiver;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.db.commitlog.CommitLogReplayer;
@RunWith(OrderedJUnit4ClassRunner.class)
public class RecoveryManagerTest
{
private static Logger logger = LoggerFactory.getLogger(RecoveryManagerTest.class);
+ static final Semaphore blocker = new Semaphore(0);
+ static final Semaphore blocked = new Semaphore(0);
+ static CommitLogReplayer.MutationInitiator originalInitiator = null;
+ static final CommitLogReplayer.MutationInitiator mockInitiator = new CommitLogReplayer.MutationInitiator()
+ {
+ @Override
+ protected Future<Integer> initiateMutation(final Mutation mutation,
+ final long segmentId,
+ final int serializedSize,
+ final long entryLocation,
+ final CommitLogReplayer clr)
+ {
+ final Future<Integer> toWrap = super.initiateMutation(mutation,
+ segmentId,
+ serializedSize,
+ entryLocation,
+ clr);
+ return new Future<Integer>()
+ {
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isCancelled()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isDone()
+ {
+ return blocker.availablePermits() > 0 && toWrap.isDone();
+ }
+
+ @Override
+ public Integer get() throws InterruptedException, ExecutionException
+ {
+ System.out.println("Got blocker once");
+ blocked.release();
+ blocker.acquire();
+ return toWrap.get();
+ }
+
+ @Override
+ public Integer get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException
+ {
+ blocked.release();
+ blocker.tryAcquire(1, timeout, unit);
+ return toWrap.get(timeout, unit);
+ }
+
+ };
+ }
+ };
private static final String KEYSPACE1 = "RecoveryManagerTest1";
private static final String CF_STANDARD1 = "Standard1";
@@ -86,6 +151,78 @@ public class RecoveryManagerTest
}
@Test
+ public void testRecoverBlocksOnBytesOutstanding() throws Exception
+ {
+ long originalMaxOutstanding = CommitLogReplayer.MAX_OUTSTANDING_REPLAY_BYTES;
+ CommitLogReplayer.MAX_OUTSTANDING_REPLAY_BYTES = 1;
+ CommitLogReplayer.MutationInitiator originalInitiator = CommitLogReplayer.mutationInitiator;
+ CommitLogReplayer.mutationInitiator = mockInitiator;
+ try
+ {
+ CommitLog.instance.resetUnsafe(true);
+ Keyspace keyspace1 = Keyspace.open(KEYSPACE1);
+ Keyspace keyspace2 = Keyspace.open(KEYSPACE2);
+
+ UnfilteredRowIterator upd1 = Util.apply(new RowUpdateBuilder(keyspace1.getColumnFamilyStore(CF_STANDARD1).metadata, 1L, 0, "keymulti")
+ .clustering("col1").add("val", "1")
+ .build());
+
+ UnfilteredRowIterator upd2 = Util.apply(new RowUpdateBuilder(keyspace2.getColumnFamilyStore(CF_STANDARD3).metadata, 1L, 0, "keymulti")
+ .clustering("col2").add("val", "1")
+ .build());
+
+ keyspace1.getColumnFamilyStore("Standard1").clearUnsafe();
+ keyspace2.getColumnFamilyStore("Standard3").clearUnsafe();
+
+ DecoratedKey dk = Util.dk("keymulti");
+ Assert.assertTrue(Util.getAllUnfiltered(Util.cmd(keyspace1.getColumnFamilyStore(CF_STANDARD1), dk).build()).isEmpty());
+ Assert.assertTrue(Util.getAllUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).isEmpty());
+
+ final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
+ Thread t = new Thread() {
+ @Override
+ public void run()
+ {
+ try
+ {
+ CommitLog.instance.resetUnsafe(false); // disassociate segments from live CL
+ }
+ catch (Throwable t)
+ {
+ err.set(t);
+ }
+ }
+ };
+ t.start();
+ Assert.assertTrue(blocked.tryAcquire(1, 20, TimeUnit.SECONDS));
+ Thread.sleep(100);
+ Assert.assertTrue(t.isAlive());
+ blocker.release(Integer.MAX_VALUE);
+ t.join(20 * 1000);
+
+ if (err.get() != null)
+ throw new RuntimeException(err.get());
+
+ if (t.isAlive())
+ {
+ Throwable toPrint = new Throwable();
+ toPrint.setStackTrace(Thread.getAllStackTraces().get(t));
+ toPrint.printStackTrace(System.out);
+ }
+ Assert.assertFalse(t.isAlive());
+
+ Assert.assertTrue(Util.equal(upd1, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace1.getColumnFamilyStore(CF_STANDARD1), dk).build()).unfilteredIterator()));
+ Assert.assertTrue(Util.equal(upd2, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).unfilteredIterator()));
+ }
+ finally
+ {
+ CommitLogReplayer.mutationInitiator = originalInitiator;
+ CommitLogReplayer.MAX_OUTSTANDING_REPLAY_BYTES = originalMaxOutstanding;
+ }
+ }
+
+
+ @Test
public void testOne() throws IOException
{
CommitLog.instance.resetUnsafe(true);