You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/12/12 14:22:34 UTC
[1/2] cassandra git commit: Ensure memtable flush cannot expire
commit log entries from its future
Repository: cassandra
Updated Branches:
refs/heads/trunk f35e9c255 -> 480cd39c2
Ensure memtable flush cannot expire commit log entries from its future
patch by benedict; reviewed by aweisburg for CASSANDRA-8383
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7e3f6151
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7e3f6151
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7e3f6151
Branch: refs/heads/trunk
Commit: 7e3f6151abc96ceb1a2cac1bc117324c4de630e9
Parents: 5c69584
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Dec 12 13:20:19 2014 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Dec 12 13:20:19 2014 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 34 ++++++++---
.../org/apache/cassandra/db/DataTracker.java | 5 +-
src/java/org/apache/cassandra/db/Memtable.java | 62 ++++++++++++--------
.../cassandra/db/commitlog/CommitLog.java | 4 +-
5 files changed, 69 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b4cb6fb..18efc7e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.3
+ * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)
* Make read "defrag" async to reclaim memtables (CASSANDRA-8459)
* Remove tmplink files for offline compactions (CASSANDRA-8321)
* Reduce maxHintsInProgress (CASSANDRA-8415)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 99940b7..08f7969 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import javax.management.*;
@@ -910,12 +911,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
final boolean flushSecondaryIndexes;
final OpOrder.Barrier writeBarrier;
final CountDownLatch latch = new CountDownLatch(1);
- volatile ReplayPosition lastReplayPosition;
+ final ReplayPosition lastReplayPosition;
- private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier)
+ private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition lastReplayPosition)
{
this.writeBarrier = writeBarrier;
this.flushSecondaryIndexes = flushSecondaryIndexes;
+ this.lastReplayPosition = lastReplayPosition;
}
public void run()
@@ -995,19 +997,36 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
memtables = new ArrayList<>();
// submit flushes for the memtable for any indexed sub-cfses, and our own
- final ReplayPosition minReplayPosition = CommitLog.instance.getContext();
+ AtomicReference<ReplayPosition> lastReplayPositionHolder = new AtomicReference<>();
for (ColumnFamilyStore cfs : concatWithIndexes())
{
// switch all memtables, regardless of their dirty status, setting the barrier
// so that we can reach a coordinated decision about cleanliness once they
// are no longer possible to be modified
Memtable mt = cfs.data.switchMemtable(truncate);
- mt.setDiscarding(writeBarrier, minReplayPosition);
+ mt.setDiscarding(writeBarrier, lastReplayPositionHolder);
memtables.add(mt);
}
+ // we now attempt to define the lastReplayPosition; we do this by grabbing the current limit from the CL
+ // and attempting to set the holder to this value. at the same time all writes to the memtables are
+ // also maintaining this value, so if somebody sneaks ahead of us somehow (should be rare) we simply retry,
+ // so that we know all operations prior to the position have not reached it yet
+ ReplayPosition lastReplayPosition;
+ while (true)
+ {
+ lastReplayPosition = new Memtable.LastReplayPosition(CommitLog.instance.getContext());
+ ReplayPosition currentLast = lastReplayPositionHolder.get();
+ if ((currentLast == null || currentLast.compareTo(lastReplayPosition) <= 0)
+ && lastReplayPositionHolder.compareAndSet(currentLast, lastReplayPosition))
+ break;
+ }
+
+ // we then issue the barrier; this lets us wait for all operations started prior to the barrier to complete;
+ // since this happens after wiring up the lastReplayPosition, we also know all operations with earlier
+ // replay positions have also completed, i.e. the memtables are done and ready to flush
writeBarrier.issue();
- postFlush = new PostFlush(!truncate, writeBarrier);
+ postFlush = new PostFlush(!truncate, writeBarrier, lastReplayPosition);
}
public void run()
@@ -1059,7 +1078,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
// signal the post-flush we've done our work
- postFlush.lastReplayPosition = memtables.get(0).getLastReplayPosition();
postFlush.latch.countDown();
}
}
@@ -1131,8 +1149,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
long start = System.nanoTime();
- Memtable mt = data.getMemtableFor(opGroup);
- final long timeDelta = mt.put(key, columnFamily, indexer, opGroup, replayPosition);
+ Memtable mt = data.getMemtableFor(opGroup, replayPosition);
+ final long timeDelta = mt.put(key, columnFamily, indexer, opGroup);
maybeUpdateRowCache(key);
metric.writeLatency.addNano(System.nanoTime() - start);
if(timeDelta < Long.MAX_VALUE)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 7df2b75..d086b47 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Predicate;
import com.google.common.collect.*;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,7 +55,7 @@ public class DataTracker
}
// get the Memtable that the ordered writeOp should be directed to
- public Memtable getMemtableFor(OpOrder.Group opGroup)
+ public Memtable getMemtableFor(OpOrder.Group opGroup, ReplayPosition replayPosition)
{
// since any new memtables appended to the list after we fetch it will be for operations started
// after us, we can safely assume that we will always find the memtable that 'accepts' us;
@@ -65,7 +66,7 @@ public class DataTracker
// assign operations to a memtable that was retired/queued before we started)
for (Memtable memtable : view.get().liveMemtables)
{
- if (memtable.accepts(opGroup))
+ if (memtable.accepts(opGroup, replayPosition))
return memtable;
}
throw new AssertionError(view.get().liveMemtables.toString());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 3ae5da4..eb04bea 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -61,10 +61,17 @@ public class Memtable
// the write barrier for directing writes to this memtable during a switch
private volatile OpOrder.Barrier writeBarrier;
// the last ReplayPosition owned by this Memtable; all ReplayPositions lower are owned by this or an earlier Memtable
- private final AtomicReference<ReplayPosition> lastReplayPosition = new AtomicReference<>();
+ private volatile AtomicReference<ReplayPosition> lastReplayPosition;
// the "first" ReplayPosition owned by this Memtable; this is inaccurate, and only used as a convenience to prevent CLSM flushing wantonly
private final ReplayPosition minReplayPosition = CommitLog.instance.getContext();
+ public static final class LastReplayPosition extends ReplayPosition
+ {
+ public LastReplayPosition(ReplayPosition copy) {
+ super(copy.segment, copy.position);
+ }
+ }
+
// We index the memtable by RowPosition only for the purpose of being able
// to select key range using Token.KeyBound. However put() ensures that we
// actually only store DecoratedKey.
@@ -101,10 +108,10 @@ public class Memtable
return currentOperations.get();
}
- void setDiscarding(OpOrder.Barrier writeBarrier, ReplayPosition minLastReplayPosition)
+ void setDiscarding(OpOrder.Barrier writeBarrier, AtomicReference<ReplayPosition> lastReplayPosition)
{
assert this.writeBarrier == null;
- this.lastReplayPosition.set(minLastReplayPosition);
+ this.lastReplayPosition = lastReplayPosition;
this.writeBarrier = writeBarrier;
allocator.setDiscarding();
}
@@ -114,10 +121,34 @@ public class Memtable
allocator.setDiscarded();
}
- public boolean accepts(OpOrder.Group opGroup)
+ // decide if this memtable should take the write, or if it should go to the next memtable
+ public boolean accepts(OpOrder.Group opGroup, ReplayPosition replayPosition)
{
+ // if the barrier hasn't been set yet, then this memtable is still taking ALL writes
OpOrder.Barrier barrier = this.writeBarrier;
- return barrier == null || barrier.isAfter(opGroup);
+ if (barrier == null)
+ return true;
+ // if the barrier has been set, but is in the past, we are definitely destined for a future memtable
+ if (!barrier.isAfter(opGroup))
+ return false;
+ // if we aren't durable we are directed only by the barrier
+ if (replayPosition == null)
+ return true;
+ while (true)
+ {
+ // otherwise we check if we are in the past/future wrt the CL boundary;
+ // if the boundary hasn't been finalised yet, we simply update it to the max of
+ // its current value and ours; if it HAS been finalised, we simply accept its judgement
+ // this permits us to coordinate a safe boundary, as the boundary choice is made
+ // atomically wrt our max() maintenance, so an operation cannot sneak into the past
+ ReplayPosition currentLast = lastReplayPosition.get();
+ if (currentLast instanceof LastReplayPosition)
+ return currentLast.compareTo(replayPosition) >= 0;
+ if (currentLast != null && currentLast.compareTo(replayPosition) >= 0)
+ return true;
+ if (lastReplayPosition.compareAndSet(currentLast, replayPosition))
+ return true;
+ }
}
public boolean isLive()
@@ -150,22 +181,8 @@ public class Memtable
*
* replayPosition should only be null if this is a secondary index, in which case it is *expected* to be null
*/
- long put(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer, OpOrder.Group opGroup, ReplayPosition replayPosition)
+ long put(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer, OpOrder.Group opGroup)
{
- if (replayPosition != null && writeBarrier != null)
- {
- // if the writeBarrier is set, we want to maintain lastReplayPosition; this is an optimisation to avoid
- // casing it for every write, but still ensure it is correct when writeBarrier.await() completes.
- while (true)
- {
- ReplayPosition last = lastReplayPosition.get();
- if (last.compareTo(replayPosition) >= 0)
- break;
- if (lastReplayPosition.compareAndSet(last, replayPosition))
- break;
- }
- }
-
AtomicBTreeColumns previous = rows.get(key);
if (previous == null)
@@ -274,11 +291,6 @@ public class Memtable
return creationTime;
}
- public ReplayPosition getLastReplayPosition()
- {
- return lastReplayPosition.get();
- }
-
class FlushRunnable extends DiskAwareRunnable
{
private final ReplayPosition context;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index ee9ca14..9b51a33 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -158,8 +158,8 @@ public class CommitLog implements CommitLogMBean
}
/**
- * @return a Future representing a ReplayPosition such that when it is ready,
- * all Allocations created prior to the getContext call will be written to the log
+ * @return a ReplayPosition which, if >= one returned from add(), implies add() was started
+ * (but not necessarily finished) prior to this call
*/
public ReplayPosition getContext()
{
[2/2] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Posted by be...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Conflicts:
src/java/org/apache/cassandra/db/DataTracker.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/480cd39c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/480cd39c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/480cd39c
Branch: refs/heads/trunk
Commit: 480cd39c28d55a0dd69c8b654b9976d93c98003e
Parents: f35e9c2 7e3f615
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Dec 12 13:22:18 2014 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Dec 12 13:22:18 2014 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 34 ++++++++---
.../org/apache/cassandra/db/DataTracker.java | 5 +-
src/java/org/apache/cassandra/db/Memtable.java | 62 ++++++++++++--------
.../cassandra/db/commitlog/CommitLog.java | 4 +-
5 files changed, 69 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/480cd39c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 93823c3,18efc7e..06cd2ca
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,45 -1,5 +1,46 @@@
+3.0
+ * Support for user-defined aggregation functions (CASSANDRA-8053)
+ * Fix NPE in SelectStatement with empty IN values (CASSANDRA-8419)
+ * Refactor SelectStatement, return IN results in natural order instead
+ of IN value list order (CASSANDRA-7981)
+ * Support UDTs, tuples, and collections in user-defined
+ functions (CASSANDRA-7563)
+ * Fix aggregate fn results on empty selection, result column name,
+ and cqlsh parsing (CASSANDRA-8229)
+ * Mark sstables as repaired after full repair (CASSANDRA-7586)
+ * Extend Descriptor to include a format value and refactor reader/writer apis (CASSANDRA-7443)
+ * Integrate JMH for microbenchmarks (CASSANDRA-8151)
+ * Keep sstable levels when bootstrapping (CASSANDRA-7460)
+ * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838)
+ * Support for aggregation functions (CASSANDRA-4914)
+ * Remove cassandra-cli (CASSANDRA-7920)
+ * Accept dollar quoted strings in CQL (CASSANDRA-7769)
+ * Make assassinate a first class command (CASSANDRA-7935)
+ * Support IN clause on any clustering column (CASSANDRA-4762)
+ * Improve compaction logging (CASSANDRA-7818)
+ * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
+ * Do anticompaction in groups (CASSANDRA-6851)
+ * Support pure user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
+ 7924, 7812, 8063, 7813)
+ * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
+ * Move sstable RandomAccessReader to nio2, which allows using the
+ FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
+ * Remove CQL2 (CASSANDRA-5918)
+ * Add Thrift get_multi_slice call (CASSANDRA-6757)
+ * Optimize fetching multiple cells by name (CASSANDRA-6933)
+ * Allow compilation in java 8 (CASSANDRA-7028)
+ * Make incremental repair default (CASSANDRA-7250)
+ * Enable code coverage thru JaCoCo (CASSANDRA-7226)
+ * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369)
+ * Shorten SSTable path (CASSANDRA-6962)
+ * Use unsafe mutations for most unit tests (CASSANDRA-6969)
+ * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
+ * Fail on very large batch sizes (CASSANDRA-8011)
+ * Improve concurrency of repair (CASSANDRA-6455, 8208)
+
+
2.1.3
+ * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)
* Make read "defrag" async to reclaim memtables (CASSANDRA-8459)
* Remove tmplink files for offline compactions (CASSANDRA-8321)
* Reduce maxHintsInProgress (CASSANDRA-8415)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/480cd39c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/480cd39c/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/DataTracker.java
index 5eda67e,d086b47..16cf62d
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@@ -24,7 -24,7 +24,8 @@@ import java.util.concurrent.atomic.Atom
import com.google.common.base.Predicate;
import com.google.common.collect.*;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/480cd39c/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/480cd39c/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------