You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/12/17 11:34:00 UTC

[3/3] git commit: Fix potential infinite loop when reloading CFS

Fix potential infinite loop when reloading CFS

patch by slebresne; reviewed by jbellis for CASSANDRA-5064


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9b217e46
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9b217e46
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9b217e46

Branch: refs/heads/trunk
Commit: 9b217e4676187cd6b4f05b6724c199dd15e03f73
Parents: ab071c5
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Mon Dec 17 11:29:15 2012 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Mon Dec 17 11:29:15 2012 +0100

----------------------------------------------------------------------
 CHANGES.txt                                        |    3 +-
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   99 +++++----------
 src/java/org/apache/cassandra/db/DefsTable.java    |    5 +-
 src/java/org/apache/cassandra/db/Memtable.java     |   12 --
 src/java/org/apache/cassandra/db/Table.java        |    6 +-
 .../apache/cassandra/service/StorageService.java   |    6 +-
 .../org/apache/cassandra/streaming/StreamOut.java  |    6 +-
 7 files changed, 36 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b217e46/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9a9df1b..327d427 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
-1.2.0
+1.2.0-rc2
  * cqlsh: add DESCRIBE KEYSPACES command (CASSANDRA-5060)
+ * Fix potential infinite loop when reloading CFS (CASSANDRA-5064)
 Merged from 1.1:
  * fix temporarily missing schema after upgrade from pre-1.1.5 (CASSANDRA-5061)
  * Fix ALTER TABLE overriding compression options with defaults

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b217e46/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 9cb4c66..364565f 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -81,10 +81,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
 
     /*
-     * maybeSwitchMemtable puts Memtable.getSortedContents on the writer executor.  When the write is complete,
+     * switchMemtable puts Memtable.getSortedContents on the writer executor.  When the write is complete,
      * we turn the writer into an SSTableReader and add it to ssTables where it is available for reads.
      *
-     * There are two other things that maybeSwitchMemtable does.
+     * There are two other things that switchMemtable does.
      * First, it puts the Memtable into memtablesPendingFlush, where it stays until the flush is complete
      * and it's been added as an SSTableReader to ssTables_.  Second, it adds an entry to commitLogUpdater
      * that waits for the flush to complete, then calls onMemtableFlush.  This allows multiple flushes
@@ -157,34 +157,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         indexManager.reload();
 
         // If the CF comparator has changed, we need to change the memtable,
-        // because the old one still aliases the previous comparator. We don't
-        // call forceFlush() because it can skip the switch if the memtable is
-        // clean, which we don't want here. Also, because there can be a race
-        // between the time we acquire the current memtable and we flush it
-        // (another thread can have flushed it first), we attempt the switch
-        // until we know the memtable has the current comparator.
-        try
-        {
-            while (true)
-            {
-                AbstractType comparator = metadata.comparator;
-                Memtable memtable = getMemtableThreadSafe();
-                if (memtable.initialComparator == comparator)
-                    break;
-
-                Future future = maybeSwitchMemtable(getMemtableThreadSafe(), true);
-                if (future != null)
-                    future.get();
-            }
-        }
-        catch (ExecutionException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
+        // because the old one still aliases the previous comparator.
+        if (getMemtableThreadSafe().initialComparator != metadata.comparator)
+            switchMemtable(true, true);
     }
 
     private void maybeReloadCompactionStrategy()
@@ -610,15 +585,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return desc.filenameFor(Component.DATA);
     }
 
-    /** flush the given memtable and swap in a new one for its CFS, if it hasn't been frozen already.  threadsafe. */
-    public Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean writeCommitLog)
+    /**
+     * Switch and flush the current memtable, if it was dirty. The forceSwitch
+     * flag allow to force switching the memtable even if it is clean (though
+     * in that case we don't flush, as there is no point).
+     */
+    public Future<?> switchMemtable(final boolean writeCommitLog, boolean forceSwitch)
     {
-        if (oldMemtable.isFrozen())
-        {
-            logger.debug("memtable is already frozen; another thread must be flushing it");
-            return null;
-        }
-
         /*
          * If we can get the writelock, that means no new updates can come in and
          * all ongoing updates to memtables have completed. We can get the tail
@@ -632,13 +605,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         Table.switchLock.writeLock().lock();
         try
         {
-            if (oldMemtable.isFrozen())
-            {
-                logger.debug("memtable is already frozen; another thread must be flushing it");
-                return null;
-            }
-
-            assert getMemtableThreadSafe() == oldMemtable;
             final Future<ReplayPosition> ctx = writeCommitLog ? CommitLog.instance.getContext() : Futures.immediateFuture(ReplayPosition.NONE);
 
             // submit the memtable for any indexed sub-cfses, and our own.
@@ -646,20 +612,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             // don't assume that this.memtable is dirty; forceFlush can bring us here during index build even if it is not
             for (ColumnFamilyStore cfs : concatWithIndexes())
             {
-                Memtable mt = cfs.getMemtableThreadSafe();
-                if (!mt.isClean() && !mt.isFrozen())
-                {
-                    // We need to freeze indexes too because they can be concurrently flushed too (#3547)
-                    mt.freeze();
+                if (forceSwitch || !cfs.getMemtableThreadSafe().isClean())
                     icc.add(cfs);
-                }
             }
+
             final CountDownLatch latch = new CountDownLatch(icc.size());
             for (ColumnFamilyStore cfs : icc)
             {
                 Memtable memtable = cfs.data.switchMemtable();
-                logger.info("Enqueuing flush of {}", memtable);
-                memtable.flushAndSignal(latch, flushWriter, ctx);
+                // With forceSwitch it's possible to get a clean memtable here.
+                // In that case, since we've switched it already, just remove
+                // it from the memtable pending flush right away.
+                if (memtable.isClean())
+                {
+                    cfs.replaceFlushed(memtable, null);
+                    latch.countDown();
+                }
+                else
+                {
+                    logger.info("Enqueuing flush of {}", memtable);
+                    memtable.flushAndSignal(latch, flushWriter, ctx);
+                }
             }
 
             if (metric.memtableSwitchCount.count() == Long.MAX_VALUE)
@@ -713,17 +686,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         if (clean)
         {
             logger.debug("forceFlush requested but everything is clean in {}", columnFamily);
-            return null;
+            return Futures.immediateCheckedFuture(null);
         }
 
-        return maybeSwitchMemtable(getMemtableThreadSafe(), true);
+        return switchMemtable(true, false);
     }
 
     public void forceBlockingFlush() throws ExecutionException, InterruptedException
     {
-        Future<?> future = forceFlush();
-        if (future != null)
-            future.get();
+        forceFlush().get();
     }
 
     public void maybeUpdateRowCache(DecoratedKey key, ColumnFamily columnFamily)
@@ -1046,16 +1017,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return (int) metric.memtableSwitchCount.count();
     }
 
-    /**
-     * get the current memtable in a threadsafe fashion.  note that simply "return memtable_" is
-     * incorrect; you need to lock to introduce a thread safe happens-before ordering.
-     *
-     * do NOT use this method to do either a put or get on the memtable object, since it could be
-     * flushed in the meantime (and its executor terminated).
-     *
-     * also do NOT make this method public or it will really get impossible to reason about these things.
-     * @return
-     */
     private Memtable getMemtableThreadSafe()
     {
         return data.getMemtable();
@@ -1783,7 +1744,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 for (ColumnFamilyStore cfs : concatWithIndexes())
                 {
                     Memtable mt = cfs.getMemtableThreadSafe();
-                    if (!mt.isClean() && !mt.isFrozen())
+                    if (!mt.isClean())
                     {
                         mt.cfs.data.renewMemtable();
                     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b217e46/src/java/org/apache/cassandra/db/DefsTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTable.java b/src/java/org/apache/cassandra/db/DefsTable.java
index a9f6427..30614b7 100644
--- a/src/java/org/apache/cassandra/db/DefsTable.java
+++ b/src/java/org/apache/cassandra/db/DefsTable.java
@@ -634,10 +634,7 @@ public class DefsTable
 
     private static void flushSchemaCF(String cfName)
     {
-        Future<?> flush = SystemTable.schemaCFS(cfName).forceFlush();
-
-        if (flush != null)
-            FBUtilities.waitOnFuture(flush);
+        FBUtilities.waitOnFuture(SystemTable.schemaCFS(cfName).forceFlush());
     }
 
     private static ByteBuffer toUTF8Bytes(UUID version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b217e46/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 82d22ca..56e2bf5 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -81,7 +81,6 @@ public class Memtable
 
     volatile static Memtable activelyMeasuring;
 
-    private volatile boolean isFrozen;
     private final AtomicLong currentSize = new AtomicLong(0);
     private final AtomicLong currentOperations = new AtomicLong(0);
 
@@ -141,16 +140,6 @@ public class Memtable
         return currentOperations.get();
     }
 
-    boolean isFrozen()
-    {
-        return isFrozen;
-    }
-
-    void freeze()
-    {
-        isFrozen = true;
-    }
-
     /**
      * Should only be called by ColumnFamilyStore.apply.  NOT a public API.
      * (CFS handles locking to avoid submitting an op
@@ -158,7 +147,6 @@ public class Memtable
     */
     void put(DecoratedKey key, ColumnFamily columnFamily, SecondaryIndexManager.Updater indexer)
     {
-        assert !isFrozen; // not 100% foolproof but hell, it's an assert
         resolve(key, columnFamily, indexer);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b217e46/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index 18b7e4b..bb5e6ee 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -450,11 +450,7 @@ public class Table
     {
         List<Future<?>> futures = new ArrayList<Future<?>>();
         for (UUID cfId : columnFamilyStores.keySet())
-        {
-            Future<?> future = columnFamilyStores.get(cfId).forceFlush();
-            if (future != null)
-                futures.add(future);
-        }
+            futures.add(columnFamilyStores.get(cfId).forceFlush());
         return futures;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b217e46/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 08dc5fc..5577680 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -479,11 +479,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
                     if (!ksm.durableWrites)
                     {
                         for (ColumnFamilyStore cfs : table.getColumnFamilyStores())
-                        {
-                            Future<?> future = cfs.forceFlush();
-                            if (future != null)
-                                flushes.add(future);
-                        }
+                            flushes.add(cfs.forceFlush());
                     }
                 }
                 FBUtilities.waitOnFutures(flushes);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b217e46/src/java/org/apache/cassandra/streaming/StreamOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamOut.java b/src/java/org/apache/cassandra/streaming/StreamOut.java
index 2ade0c6..7043be4 100644
--- a/src/java/org/apache/cassandra/streaming/StreamOut.java
+++ b/src/java/org/apache/cassandra/streaming/StreamOut.java
@@ -93,11 +93,7 @@ public class StreamOut
         logger.info("Flushing memtables for {}...", stores);
         List<Future<?>> flushes = new ArrayList<Future<?>>();
         for (ColumnFamilyStore cfstore : stores)
-        {
-            Future<?> flush = cfstore.forceFlush();
-            if (flush != null)
-                flushes.add(flush);
-        }
+            flushes.add(cfstore.forceFlush());
         FBUtilities.waitOnFutures(flushes);
     }