You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/12/12 14:40:51 UTC
cassandra git commit: Ensure memtable flush cannot expire commit log
entries from its future
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 5c6958462 -> 7e3f6151a
Ensure memtable flush cannot expire commit log entries from its future
patch by benedict; reviewed by aweisburg for CASSANDRA-8383
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7e3f6151
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7e3f6151
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7e3f6151
Branch: refs/heads/cassandra-2.1
Commit: 7e3f6151abc96ceb1a2cac1bc117324c4de630e9
Parents: 5c69584
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Dec 12 13:20:19 2014 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Dec 12 13:20:19 2014 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 34 ++++++++---
.../org/apache/cassandra/db/DataTracker.java | 5 +-
src/java/org/apache/cassandra/db/Memtable.java | 62 ++++++++++++--------
.../cassandra/db/commitlog/CommitLog.java | 4 +-
5 files changed, 69 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b4cb6fb..18efc7e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.3
+ * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)
* Make read "defrag" async to reclaim memtables (CASSANDRA-8459)
* Remove tmplink files for offline compactions (CASSANDRA-8321)
* Reduce maxHintsInProgress (CASSANDRA-8415)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 99940b7..08f7969 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import javax.management.*;
@@ -910,12 +911,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
final boolean flushSecondaryIndexes;
final OpOrder.Barrier writeBarrier;
final CountDownLatch latch = new CountDownLatch(1);
- volatile ReplayPosition lastReplayPosition;
+ final ReplayPosition lastReplayPosition;
- private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier)
+ private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition lastReplayPosition)
{
this.writeBarrier = writeBarrier;
this.flushSecondaryIndexes = flushSecondaryIndexes;
+ this.lastReplayPosition = lastReplayPosition;
}
public void run()
@@ -995,19 +997,36 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
memtables = new ArrayList<>();
// submit flushes for the memtable for any indexed sub-cfses, and our own
- final ReplayPosition minReplayPosition = CommitLog.instance.getContext();
+ AtomicReference<ReplayPosition> lastReplayPositionHolder = new AtomicReference<>();
for (ColumnFamilyStore cfs : concatWithIndexes())
{
// switch all memtables, regardless of their dirty status, setting the barrier
// so that we can reach a coordinated decision about cleanliness once they
// are no longer possible to be modified
Memtable mt = cfs.data.switchMemtable(truncate);
- mt.setDiscarding(writeBarrier, minReplayPosition);
+ mt.setDiscarding(writeBarrier, lastReplayPositionHolder);
memtables.add(mt);
}
+ // we now attempt to define the lastReplayPosition; we do this by grabbing the current limit from the CL
+ // and attempting to set the holder to this value. at the same time all writes to the memtables are
+ // also maintaining this value, so if somebody sneaks ahead of us somehow (should be rare) we simply retry,
+ // so that we know all operations prior to the position have not reached it yet
+ ReplayPosition lastReplayPosition;
+ while (true)
+ {
+ lastReplayPosition = new Memtable.LastReplayPosition(CommitLog.instance.getContext());
+ ReplayPosition currentLast = lastReplayPositionHolder.get();
+ if ((currentLast == null || currentLast.compareTo(lastReplayPosition) <= 0)
+ && lastReplayPositionHolder.compareAndSet(currentLast, lastReplayPosition))
+ break;
+ }
+
+ // we then issue the barrier; this lets us wait for all operations started prior to the barrier to complete;
+ // since this happens after wiring up the lastReplayPosition, we also know all operations with earlier
+ // replay positions have also completed, i.e. the memtables are done and ready to flush
writeBarrier.issue();
- postFlush = new PostFlush(!truncate, writeBarrier);
+ postFlush = new PostFlush(!truncate, writeBarrier, lastReplayPosition);
}
public void run()
@@ -1059,7 +1078,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
// signal the post-flush we've done our work
- postFlush.lastReplayPosition = memtables.get(0).getLastReplayPosition();
postFlush.latch.countDown();
}
}
@@ -1131,8 +1149,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
long start = System.nanoTime();
- Memtable mt = data.getMemtableFor(opGroup);
- final long timeDelta = mt.put(key, columnFamily, indexer, opGroup, replayPosition);
+ Memtable mt = data.getMemtableFor(opGroup, replayPosition);
+ final long timeDelta = mt.put(key, columnFamily, indexer, opGroup);
maybeUpdateRowCache(key);
metric.writeLatency.addNano(System.nanoTime() - start);
if(timeDelta < Long.MAX_VALUE)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 7df2b75..d086b47 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Predicate;
import com.google.common.collect.*;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,7 +55,7 @@ public class DataTracker
}
// get the Memtable that the ordered writeOp should be directed to
- public Memtable getMemtableFor(OpOrder.Group opGroup)
+ public Memtable getMemtableFor(OpOrder.Group opGroup, ReplayPosition replayPosition)
{
// since any new memtables appended to the list after we fetch it will be for operations started
// after us, we can safely assume that we will always find the memtable that 'accepts' us;
@@ -65,7 +66,7 @@ public class DataTracker
// assign operations to a memtable that was retired/queued before we started)
for (Memtable memtable : view.get().liveMemtables)
{
- if (memtable.accepts(opGroup))
+ if (memtable.accepts(opGroup, replayPosition))
return memtable;
}
throw new AssertionError(view.get().liveMemtables.toString());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 3ae5da4..eb04bea 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -61,10 +61,17 @@ public class Memtable
// the write barrier for directing writes to this memtable during a switch
private volatile OpOrder.Barrier writeBarrier;
// the last ReplayPosition owned by this Memtable; all ReplayPositions lower are owned by this or an earlier Memtable
- private final AtomicReference<ReplayPosition> lastReplayPosition = new AtomicReference<>();
+ private volatile AtomicReference<ReplayPosition> lastReplayPosition;
// the "first" ReplayPosition owned by this Memtable; this is inaccurate, and only used as a convenience to prevent CLSM flushing wantonly
private final ReplayPosition minReplayPosition = CommitLog.instance.getContext();
+ public static final class LastReplayPosition extends ReplayPosition
+ {
+ public LastReplayPosition(ReplayPosition copy) {
+ super(copy.segment, copy.position);
+ }
+ }
+
// We index the memtable by RowPosition only for the purpose of being able
// to select key range using Token.KeyBound. However put() ensures that we
// actually only store DecoratedKey.
@@ -101,10 +108,10 @@ public class Memtable
return currentOperations.get();
}
- void setDiscarding(OpOrder.Barrier writeBarrier, ReplayPosition minLastReplayPosition)
+ void setDiscarding(OpOrder.Barrier writeBarrier, AtomicReference<ReplayPosition> lastReplayPosition)
{
assert this.writeBarrier == null;
- this.lastReplayPosition.set(minLastReplayPosition);
+ this.lastReplayPosition = lastReplayPosition;
this.writeBarrier = writeBarrier;
allocator.setDiscarding();
}
@@ -114,10 +121,34 @@ public class Memtable
allocator.setDiscarded();
}
- public boolean accepts(OpOrder.Group opGroup)
+ // decide if this memtable should take the write, or if it should go to the next memtable
+ public boolean accepts(OpOrder.Group opGroup, ReplayPosition replayPosition)
{
+ // if the barrier hasn't been set yet, then this memtable is still taking ALL writes
OpOrder.Barrier barrier = this.writeBarrier;
- return barrier == null || barrier.isAfter(opGroup);
+ if (barrier == null)
+ return true;
+ // if the barrier has been set, but is in the past, we are definitely destined for a future memtable
+ if (!barrier.isAfter(opGroup))
+ return false;
+ // if we aren't durable we are directed only by the barrier
+ if (replayPosition == null)
+ return true;
+ while (true)
+ {
+ // otherwise we check if we are in the past/future wrt the CL boundary;
+ // if the boundary hasn't been finalised yet, we simply update it to the max of
+ // its current value and ours; if it HAS been finalised, we simply accept its judgement
+ // this permits us to coordinate a safe boundary, as the boundary choice is made
+ // atomically wrt our max() maintenance, so an operation cannot sneak into the past
+ ReplayPosition currentLast = lastReplayPosition.get();
+ if (currentLast instanceof LastReplayPosition)
+ return currentLast.compareTo(replayPosition) >= 0;
+ if (currentLast != null && currentLast.compareTo(replayPosition) >= 0)
+ return true;
+ if (lastReplayPosition.compareAndSet(currentLast, replayPosition))
+ return true;
+ }
}
public boolean isLive()
@@ -150,22 +181,8 @@ public class Memtable
*
* replayPosition should only be null if this is a secondary index, in which case it is *expected* to be null
*/
- long put(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer, OpOrder.Group opGroup, ReplayPosition replayPosition)
+ long put(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer, OpOrder.Group opGroup)
{
- if (replayPosition != null && writeBarrier != null)
- {
- // if the writeBarrier is set, we want to maintain lastReplayPosition; this is an optimisation to avoid
- // casing it for every write, but still ensure it is correct when writeBarrier.await() completes.
- while (true)
- {
- ReplayPosition last = lastReplayPosition.get();
- if (last.compareTo(replayPosition) >= 0)
- break;
- if (lastReplayPosition.compareAndSet(last, replayPosition))
- break;
- }
- }
-
AtomicBTreeColumns previous = rows.get(key);
if (previous == null)
@@ -274,11 +291,6 @@ public class Memtable
return creationTime;
}
- public ReplayPosition getLastReplayPosition()
- {
- return lastReplayPosition.get();
- }
-
class FlushRunnable extends DiskAwareRunnable
{
private final ReplayPosition context;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index ee9ca14..9b51a33 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -158,8 +158,8 @@ public class CommitLog implements CommitLogMBean
}
/**
- * @return a Future representing a ReplayPosition such that when it is ready,
- * all Allocations created prior to the getContext call will be written to the log
+ * @return a ReplayPosition which, if >= one returned from add(), implies add() was started
+ * (but not necessarily finished) prior to this call
*/
public ReplayPosition getContext()
{