You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/12/17 11:30:13 UTC
git commit: Fix potential infinite loop when reloading CFS
Updated Branches:
refs/heads/cassandra-1.2.0 ab071c505 -> 9b217e467
Fix potential infinite loop when reloading CFS
patch by slebresne; reviewed by jbellis for CASSANDRA-5064
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9b217e46
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9b217e46
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9b217e46
Branch: refs/heads/cassandra-1.2.0
Commit: 9b217e4676187cd6b4f05b6724c199dd15e03f73
Parents: ab071c5
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Mon Dec 17 11:29:15 2012 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Mon Dec 17 11:29:15 2012 +0100
----------------------------------------------------------------------
CHANGES.txt | 3 +-
.../org/apache/cassandra/db/ColumnFamilyStore.java | 99 +++++----------
src/java/org/apache/cassandra/db/DefsTable.java | 5 +-
src/java/org/apache/cassandra/db/Memtable.java | 12 --
src/java/org/apache/cassandra/db/Table.java | 6 +-
.../apache/cassandra/service/StorageService.java | 6 +-
.../org/apache/cassandra/streaming/StreamOut.java | 6 +-
7 files changed, 36 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b217e46/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9a9df1b..327d427 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
-1.2.0
+1.2.0-rc2
* cqlsh: add DESCRIBE KEYSPACES command (CASSANDRA-5060)
+ * Fix potential infinite loop when reloading CFS (CASSANDRA-5064)
Merged from 1.1:
* fix temporarily missing schema after upgrade from pre-1.1.5 (CASSANDRA-5061)
* Fix ALTER TABLE overriding compression options with defaults
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b217e46/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 9cb4c66..364565f 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -81,10 +81,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
/*
- * maybeSwitchMemtable puts Memtable.getSortedContents on the writer executor. When the write is complete,
+ * switchMemtable puts Memtable.getSortedContents on the writer executor. When the write is complete,
* we turn the writer into an SSTableReader and add it to ssTables where it is available for reads.
*
- * There are two other things that maybeSwitchMemtable does.
+ * There are two other things that switchMemtable does.
* First, it puts the Memtable into memtablesPendingFlush, where it stays until the flush is complete
* and it's been added as an SSTableReader to ssTables_. Second, it adds an entry to commitLogUpdater
* that waits for the flush to complete, then calls onMemtableFlush. This allows multiple flushes
@@ -157,34 +157,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
indexManager.reload();
// If the CF comparator has changed, we need to change the memtable,
- // because the old one still aliases the previous comparator. We don't
- // call forceFlush() because it can skip the switch if the memtable is
- // clean, which we don't want here. Also, because there can be a race
- // between the time we acquire the current memtable and we flush it
- // (another thread can have flushed it first), we attempt the switch
- // until we know the memtable has the current comparator.
- try
- {
- while (true)
- {
- AbstractType comparator = metadata.comparator;
- Memtable memtable = getMemtableThreadSafe();
- if (memtable.initialComparator == comparator)
- break;
-
- Future future = maybeSwitchMemtable(getMemtableThreadSafe(), true);
- if (future != null)
- future.get();
- }
- }
- catch (ExecutionException e)
- {
- throw new RuntimeException(e);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
+ // because the old one still aliases the previous comparator.
+ if (getMemtableThreadSafe().initialComparator != metadata.comparator)
+ switchMemtable(true, true);
}
private void maybeReloadCompactionStrategy()
@@ -610,15 +585,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return desc.filenameFor(Component.DATA);
}
- /** flush the given memtable and swap in a new one for its CFS, if it hasn't been frozen already. threadsafe. */
- public Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean writeCommitLog)
+ /**
+ * Switch and flush the current memtable, if it was dirty. The forceSwitch
+ * flag allow to force switching the memtable even if it is clean (though
+ * in that case we don't flush, as there is no point).
+ */
+ public Future<?> switchMemtable(final boolean writeCommitLog, boolean forceSwitch)
{
- if (oldMemtable.isFrozen())
- {
- logger.debug("memtable is already frozen; another thread must be flushing it");
- return null;
- }
-
/*
* If we can get the writelock, that means no new updates can come in and
* all ongoing updates to memtables have completed. We can get the tail
@@ -632,13 +605,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
Table.switchLock.writeLock().lock();
try
{
- if (oldMemtable.isFrozen())
- {
- logger.debug("memtable is already frozen; another thread must be flushing it");
- return null;
- }
-
- assert getMemtableThreadSafe() == oldMemtable;
final Future<ReplayPosition> ctx = writeCommitLog ? CommitLog.instance.getContext() : Futures.immediateFuture(ReplayPosition.NONE);
// submit the memtable for any indexed sub-cfses, and our own.
@@ -646,20 +612,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// don't assume that this.memtable is dirty; forceFlush can bring us here during index build even if it is not
for (ColumnFamilyStore cfs : concatWithIndexes())
{
- Memtable mt = cfs.getMemtableThreadSafe();
- if (!mt.isClean() && !mt.isFrozen())
- {
- // We need to freeze indexes too because they can be concurrently flushed too (#3547)
- mt.freeze();
+ if (forceSwitch || !cfs.getMemtableThreadSafe().isClean())
icc.add(cfs);
- }
}
+
final CountDownLatch latch = new CountDownLatch(icc.size());
for (ColumnFamilyStore cfs : icc)
{
Memtable memtable = cfs.data.switchMemtable();
- logger.info("Enqueuing flush of {}", memtable);
- memtable.flushAndSignal(latch, flushWriter, ctx);
+ // With forceSwitch it's possible to get a clean memtable here.
+ // In that case, since we've switched it already, just remove
+ // it from the memtable pending flush right away.
+ if (memtable.isClean())
+ {
+ cfs.replaceFlushed(memtable, null);
+ latch.countDown();
+ }
+ else
+ {
+ logger.info("Enqueuing flush of {}", memtable);
+ memtable.flushAndSignal(latch, flushWriter, ctx);
+ }
}
if (metric.memtableSwitchCount.count() == Long.MAX_VALUE)
@@ -713,17 +686,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
if (clean)
{
logger.debug("forceFlush requested but everything is clean in {}", columnFamily);
- return null;
+ return Futures.immediateCheckedFuture(null);
}
- return maybeSwitchMemtable(getMemtableThreadSafe(), true);
+ return switchMemtable(true, false);
}
public void forceBlockingFlush() throws ExecutionException, InterruptedException
{
- Future<?> future = forceFlush();
- if (future != null)
- future.get();
+ forceFlush().get();
}
public void maybeUpdateRowCache(DecoratedKey key, ColumnFamily columnFamily)
@@ -1046,16 +1017,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return (int) metric.memtableSwitchCount.count();
}
- /**
- * get the current memtable in a threadsafe fashion. note that simply "return memtable_" is
- * incorrect; you need to lock to introduce a thread safe happens-before ordering.
- *
- * do NOT use this method to do either a put or get on the memtable object, since it could be
- * flushed in the meantime (and its executor terminated).
- *
- * also do NOT make this method public or it will really get impossible to reason about these things.
- * @return
- */
private Memtable getMemtableThreadSafe()
{
return data.getMemtable();
@@ -1783,7 +1744,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
for (ColumnFamilyStore cfs : concatWithIndexes())
{
Memtable mt = cfs.getMemtableThreadSafe();
- if (!mt.isClean() && !mt.isFrozen())
+ if (!mt.isClean())
{
mt.cfs.data.renewMemtable();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b217e46/src/java/org/apache/cassandra/db/DefsTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTable.java b/src/java/org/apache/cassandra/db/DefsTable.java
index a9f6427..30614b7 100644
--- a/src/java/org/apache/cassandra/db/DefsTable.java
+++ b/src/java/org/apache/cassandra/db/DefsTable.java
@@ -634,10 +634,7 @@ public class DefsTable
private static void flushSchemaCF(String cfName)
{
- Future<?> flush = SystemTable.schemaCFS(cfName).forceFlush();
-
- if (flush != null)
- FBUtilities.waitOnFuture(flush);
+ FBUtilities.waitOnFuture(SystemTable.schemaCFS(cfName).forceFlush());
}
private static ByteBuffer toUTF8Bytes(UUID version)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b217e46/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 82d22ca..56e2bf5 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -81,7 +81,6 @@ public class Memtable
volatile static Memtable activelyMeasuring;
- private volatile boolean isFrozen;
private final AtomicLong currentSize = new AtomicLong(0);
private final AtomicLong currentOperations = new AtomicLong(0);
@@ -141,16 +140,6 @@ public class Memtable
return currentOperations.get();
}
- boolean isFrozen()
- {
- return isFrozen;
- }
-
- void freeze()
- {
- isFrozen = true;
- }
-
/**
* Should only be called by ColumnFamilyStore.apply. NOT a public API.
* (CFS handles locking to avoid submitting an op
@@ -158,7 +147,6 @@ public class Memtable
*/
void put(DecoratedKey key, ColumnFamily columnFamily, SecondaryIndexManager.Updater indexer)
{
- assert !isFrozen; // not 100% foolproof but hell, it's an assert
resolve(key, columnFamily, indexer);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b217e46/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index 18b7e4b..bb5e6ee 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -450,11 +450,7 @@ public class Table
{
List<Future<?>> futures = new ArrayList<Future<?>>();
for (UUID cfId : columnFamilyStores.keySet())
- {
- Future<?> future = columnFamilyStores.get(cfId).forceFlush();
- if (future != null)
- futures.add(future);
- }
+ futures.add(columnFamilyStores.get(cfId).forceFlush());
return futures;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b217e46/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 08dc5fc..5577680 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -479,11 +479,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
if (!ksm.durableWrites)
{
for (ColumnFamilyStore cfs : table.getColumnFamilyStores())
- {
- Future<?> future = cfs.forceFlush();
- if (future != null)
- flushes.add(future);
- }
+ flushes.add(cfs.forceFlush());
}
}
FBUtilities.waitOnFutures(flushes);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b217e46/src/java/org/apache/cassandra/streaming/StreamOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamOut.java b/src/java/org/apache/cassandra/streaming/StreamOut.java
index 2ade0c6..7043be4 100644
--- a/src/java/org/apache/cassandra/streaming/StreamOut.java
+++ b/src/java/org/apache/cassandra/streaming/StreamOut.java
@@ -93,11 +93,7 @@ public class StreamOut
logger.info("Flushing memtables for {}...", stores);
List<Future<?>> flushes = new ArrayList<Future<?>>();
for (ColumnFamilyStore cfstore : stores)
- {
- Future<?> flush = cfstore.forceFlush();
- if (flush != null)
- flushes.add(flush);
- }
+ flushes.add(cfstore.forceFlush());
FBUtilities.waitOnFutures(flushes);
}