You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/12/12 14:22:34 UTC

[1/2] cassandra git commit: Ensure memtable flush cannot expire commit log entries from its future

Repository: cassandra
Updated Branches:
  refs/heads/trunk f35e9c255 -> 480cd39c2


Ensure memtable flush cannot expire commit log entries from its future

patch by benedict; reviewed by aweisburg for CASSANDRA-8383


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7e3f6151
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7e3f6151
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7e3f6151

Branch: refs/heads/trunk
Commit: 7e3f6151abc96ceb1a2cac1bc117324c4de630e9
Parents: 5c69584
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Dec 12 13:20:19 2014 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Dec 12 13:20:19 2014 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 34 ++++++++---
 .../org/apache/cassandra/db/DataTracker.java    |  5 +-
 src/java/org/apache/cassandra/db/Memtable.java  | 62 ++++++++++++--------
 .../cassandra/db/commitlog/CommitLog.java       |  4 +-
 5 files changed, 69 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b4cb6fb..18efc7e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.3
+ * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)
  * Make read "defrag" async to reclaim memtables (CASSANDRA-8459)
  * Remove tmplink files for offline compactions (CASSANDRA-8321)
  * Reduce maxHintsInProgress (CASSANDRA-8415)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 99940b7..08f7969 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 import javax.management.*;
 
@@ -910,12 +911,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         final boolean flushSecondaryIndexes;
         final OpOrder.Barrier writeBarrier;
         final CountDownLatch latch = new CountDownLatch(1);
-        volatile ReplayPosition lastReplayPosition;
+        final ReplayPosition lastReplayPosition;
 
-        private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier)
+        private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition lastReplayPosition)
         {
             this.writeBarrier = writeBarrier;
             this.flushSecondaryIndexes = flushSecondaryIndexes;
+            this.lastReplayPosition = lastReplayPosition;
         }
 
         public void run()
@@ -995,19 +997,36 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             memtables = new ArrayList<>();
 
             // submit flushes for the memtable for any indexed sub-cfses, and our own
-            final ReplayPosition minReplayPosition = CommitLog.instance.getContext();
+            AtomicReference<ReplayPosition> lastReplayPositionHolder = new AtomicReference<>();
             for (ColumnFamilyStore cfs : concatWithIndexes())
             {
                 // switch all memtables, regardless of their dirty status, setting the barrier
                 // so that we can reach a coordinated decision about cleanliness once they
                 // are no longer possible to be modified
                 Memtable mt = cfs.data.switchMemtable(truncate);
-                mt.setDiscarding(writeBarrier, minReplayPosition);
+                mt.setDiscarding(writeBarrier, lastReplayPositionHolder);
                 memtables.add(mt);
             }
 
+            // we now attempt to define the lastReplayPosition; we do this by grabbing the current limit from the CL
+            // and attempting to set the holder to this value. at the same time all writes to the memtables are
+            // also maintaining this value, so if somebody sneaks ahead of us somehow (should be rare) we simply retry,
+            // so that we know all operations prior to the position have not reached it yet
+            ReplayPosition lastReplayPosition;
+            while (true)
+            {
+                lastReplayPosition = new Memtable.LastReplayPosition(CommitLog.instance.getContext());
+                ReplayPosition currentLast = lastReplayPositionHolder.get();
+                if ((currentLast == null || currentLast.compareTo(lastReplayPosition) <= 0)
+                    && lastReplayPositionHolder.compareAndSet(currentLast, lastReplayPosition))
+                    break;
+            }
+
+            // we then issue the barrier; this lets us wait for all operations started prior to the barrier to complete;
+            // since this happens after wiring up the lastReplayPosition, we also know all operations with earlier
+            // replay positions have also completed, i.e. the memtables are done and ready to flush
             writeBarrier.issue();
-            postFlush = new PostFlush(!truncate, writeBarrier);
+            postFlush = new PostFlush(!truncate, writeBarrier, lastReplayPosition);
         }
 
         public void run()
@@ -1059,7 +1078,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             }
 
             // signal the post-flush we've done our work
-            postFlush.lastReplayPosition = memtables.get(0).getLastReplayPosition();
             postFlush.latch.countDown();
         }
     }
@@ -1131,8 +1149,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         long start = System.nanoTime();
 
-        Memtable mt = data.getMemtableFor(opGroup);
-        final long timeDelta = mt.put(key, columnFamily, indexer, opGroup, replayPosition);
+        Memtable mt = data.getMemtableFor(opGroup, replayPosition);
+        final long timeDelta = mt.put(key, columnFamily, indexer, opGroup);
         maybeUpdateRowCache(key);
         metric.writeLatency.addNano(System.nanoTime() - start);
         if(timeDelta < Long.MAX_VALUE)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 7df2b75..d086b47 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.*;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,7 +55,7 @@ public class DataTracker
     }
 
     // get the Memtable that the ordered writeOp should be directed to
-    public Memtable getMemtableFor(OpOrder.Group opGroup)
+    public Memtable getMemtableFor(OpOrder.Group opGroup, ReplayPosition replayPosition)
     {
         // since any new memtables appended to the list after we fetch it will be for operations started
         // after us, we can safely assume that we will always find the memtable that 'accepts' us;
@@ -65,7 +66,7 @@ public class DataTracker
         // assign operations to a memtable that was retired/queued before we started)
         for (Memtable memtable : view.get().liveMemtables)
         {
-            if (memtable.accepts(opGroup))
+            if (memtable.accepts(opGroup, replayPosition))
                 return memtable;
         }
         throw new AssertionError(view.get().liveMemtables.toString());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 3ae5da4..eb04bea 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -61,10 +61,17 @@ public class Memtable
     // the write barrier for directing writes to this memtable during a switch
     private volatile OpOrder.Barrier writeBarrier;
     // the last ReplayPosition owned by this Memtable; all ReplayPositions lower are owned by this or an earlier Memtable
-    private final AtomicReference<ReplayPosition> lastReplayPosition = new AtomicReference<>();
+    private volatile AtomicReference<ReplayPosition> lastReplayPosition;
     // the "first" ReplayPosition owned by this Memtable; this is inaccurate, and only used as a convenience to prevent CLSM flushing wantonly
     private final ReplayPosition minReplayPosition = CommitLog.instance.getContext();
 
+    public static final class LastReplayPosition extends ReplayPosition
+    {
+        public LastReplayPosition(ReplayPosition copy) {
+            super(copy.segment, copy.position);
+        }
+    }
+
     // We index the memtable by RowPosition only for the purpose of being able
     // to select key range using Token.KeyBound. However put() ensures that we
     // actually only store DecoratedKey.
@@ -101,10 +108,10 @@ public class Memtable
         return currentOperations.get();
     }
 
-    void setDiscarding(OpOrder.Barrier writeBarrier, ReplayPosition minLastReplayPosition)
+    void setDiscarding(OpOrder.Barrier writeBarrier, AtomicReference<ReplayPosition> lastReplayPosition)
     {
         assert this.writeBarrier == null;
-        this.lastReplayPosition.set(minLastReplayPosition);
+        this.lastReplayPosition = lastReplayPosition;
         this.writeBarrier = writeBarrier;
         allocator.setDiscarding();
     }
@@ -114,10 +121,34 @@ public class Memtable
         allocator.setDiscarded();
     }
 
-    public boolean accepts(OpOrder.Group opGroup)
+    // decide if this memtable should take the write, or if it should go to the next memtable
+    public boolean accepts(OpOrder.Group opGroup, ReplayPosition replayPosition)
     {
+        // if the barrier hasn't been set yet, then this memtable is still taking ALL writes
         OpOrder.Barrier barrier = this.writeBarrier;
-        return barrier == null || barrier.isAfter(opGroup);
+        if (barrier == null)
+            return true;
+        // if the barrier has been set, but is in the past, we are definitely destined for a future memtable
+        if (!barrier.isAfter(opGroup))
+            return false;
+        // if we aren't durable we are directed only by the barrier
+        if (replayPosition == null)
+            return true;
+        while (true)
+        {
+            // otherwise we check if we are in the past/future wrt the CL boundary;
+            // if the boundary hasn't been finalised yet, we simply update it to the max of
+            // its current value and ours; if it HAS been finalised, we simply accept its judgement
+            // this permits us to coordinate a safe boundary, as the boundary choice is made
+            // atomically wrt our max() maintenance, so an operation cannot sneak into the past
+            ReplayPosition currentLast = lastReplayPosition.get();
+            if (currentLast instanceof LastReplayPosition)
+                return currentLast.compareTo(replayPosition) >= 0;
+            if (currentLast != null && currentLast.compareTo(replayPosition) >= 0)
+                return true;
+            if (lastReplayPosition.compareAndSet(currentLast, replayPosition))
+                return true;
+        }
     }
 
     public boolean isLive()
@@ -150,22 +181,8 @@ public class Memtable
      *
      * replayPosition should only be null if this is a secondary index, in which case it is *expected* to be null
      */
-    long put(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer, OpOrder.Group opGroup, ReplayPosition replayPosition)
+    long put(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer, OpOrder.Group opGroup)
     {
-        if (replayPosition != null && writeBarrier != null)
-        {
-            // if the writeBarrier is set, we want to maintain lastReplayPosition; this is an optimisation to avoid
-            // casing it for every write, but still ensure it is correct when writeBarrier.await() completes.
-            while (true)
-            {
-                ReplayPosition last = lastReplayPosition.get();
-                if (last.compareTo(replayPosition) >= 0)
-                    break;
-                if (lastReplayPosition.compareAndSet(last, replayPosition))
-                    break;
-            }
-        }
-
         AtomicBTreeColumns previous = rows.get(key);
 
         if (previous == null)
@@ -274,11 +291,6 @@ public class Memtable
         return creationTime;
     }
 
-    public ReplayPosition getLastReplayPosition()
-    {
-        return lastReplayPosition.get();
-    }
-
     class FlushRunnable extends DiskAwareRunnable
     {
         private final ReplayPosition context;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index ee9ca14..9b51a33 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -158,8 +158,8 @@ public class CommitLog implements CommitLogMBean
     }
 
     /**
-     * @return a Future representing a ReplayPosition such that when it is ready,
-     * all Allocations created prior to the getContext call will be written to the log
+     * @return a ReplayPosition which, if >= one returned from add(), implies add() was started
+     * (but not necessarily finished) prior to this call
      */
     public ReplayPosition getContext()
     {


[2/2] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

Posted by be...@apache.org.
Merge branch 'cassandra-2.1' into trunk

Conflicts:
	src/java/org/apache/cassandra/db/DataTracker.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/480cd39c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/480cd39c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/480cd39c

Branch: refs/heads/trunk
Commit: 480cd39c28d55a0dd69c8b654b9976d93c98003e
Parents: f35e9c2 7e3f615
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Dec 12 13:22:18 2014 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Dec 12 13:22:18 2014 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 34 ++++++++---
 .../org/apache/cassandra/db/DataTracker.java    |  5 +-
 src/java/org/apache/cassandra/db/Memtable.java  | 62 ++++++++++++--------
 .../cassandra/db/commitlog/CommitLog.java       |  4 +-
 5 files changed, 69 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/480cd39c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 93823c3,18efc7e..06cd2ca
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,45 -1,5 +1,46 @@@
 +3.0
 + * Support for user-defined aggregation functions (CASSANDRA-8053)
 + * Fix NPE in SelectStatement with empty IN values (CASSANDRA-8419)
 + * Refactor SelectStatement, return IN results in natural order instead
 +   of IN value list order (CASSANDRA-7981)
 + * Support UDTs, tuples, and collections in user-defined
 +   functions (CASSANDRA-7563)
 + * Fix aggregate fn results on empty selection, result column name,
 +   and cqlsh parsing (CASSANDRA-8229)
 + * Mark sstables as repaired after full repair (CASSANDRA-7586)
 + * Extend Descriptor to include a format value and refactor reader/writer apis (CASSANDRA-7443)
 + * Integrate JMH for microbenchmarks (CASSANDRA-8151)
 + * Keep sstable levels when bootstrapping (CASSANDRA-7460)
 + * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838)
 + * Support for aggregation functions (CASSANDRA-4914)
 + * Remove cassandra-cli (CASSANDRA-7920)
 + * Accept dollar quoted strings in CQL (CASSANDRA-7769)
 + * Make assassinate a first class command (CASSANDRA-7935)
 + * Support IN clause on any clustering column (CASSANDRA-4762)
 + * Improve compaction logging (CASSANDRA-7818)
 + * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
 + * Do anticompaction in groups (CASSANDRA-6851)
 + * Support pure user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
 +   7924, 7812, 8063, 7813)
 + * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
 + * Move sstable RandomAccessReader to nio2, which allows using the
 +   FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
 + * Remove CQL2 (CASSANDRA-5918)
 + * Add Thrift get_multi_slice call (CASSANDRA-6757)
 + * Optimize fetching multiple cells by name (CASSANDRA-6933)
 + * Allow compilation in java 8 (CASSANDRA-7028)
 + * Make incremental repair default (CASSANDRA-7250)
 + * Enable code coverage thru JaCoCo (CASSANDRA-7226)
 + * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) 
 + * Shorten SSTable path (CASSANDRA-6962)
 + * Use unsafe mutations for most unit tests (CASSANDRA-6969)
 + * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
 + * Fail on very large batch sizes (CASSANDRA-8011)
 + * Improve concurrency of repair (CASSANDRA-6455, 8208)
 +
 +
  2.1.3
+  * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)
   * Make read "defrag" async to reclaim memtables (CASSANDRA-8459)
   * Remove tmplink files for offline compactions (CASSANDRA-8321)
   * Reduce maxHintsInProgress (CASSANDRA-8415)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/480cd39c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/480cd39c/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/DataTracker.java
index 5eda67e,d086b47..16cf62d
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@@ -24,7 -24,7 +24,8 @@@ import java.util.concurrent.atomic.Atom
  
  import com.google.common.base.Predicate;
  import com.google.common.collect.*;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.db.commitlog.ReplayPosition;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/480cd39c/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/480cd39c/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------