You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/11/26 16:22:08 UTC
cassandra git commit: Handle abort() properly in SSTableRewriter
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 3faff8b15 -> b10629291
Handle abort() properly in SSTableRewriter
Patch by marcuse; reviewed by jmckenzie for CASSANDRA-8320
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b1062929
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b1062929
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b1062929
Branch: refs/heads/cassandra-2.1
Commit: b1062929185690567e4567e0e657b361c5105482
Parents: 3faff8b
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Nov 18 07:07:30 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Nov 26 16:00:51 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/io/sstable/SSTableReader.java | 22 +++
.../cassandra/io/sstable/SSTableRewriter.java | 74 ++++++--
.../io/sstable/SSTableRewriterTest.java | 180 +++++++++++++++----
4 files changed, 226 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1062929/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f022b19..e5f7c28 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.3
+ * Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
* Fix high size calculations for prepared statements (CASSANDRA-8231)
* Centralize shared executors (CASSANDRA-8055)
* Fix filtering for CONTAINS (KEY) relations on frozen collection
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1062929/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index a3e3cf5..1fe4330 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -202,6 +202,7 @@ public class SSTableReader extends SSTable
private Object replaceLock = new Object();
private SSTableReader replacedBy;
private SSTableReader replaces;
+ private SSTableReader sharesBfWith;
private SSTableDeletingTask deletingTask;
private Runnable runOnClose;
@@ -594,6 +595,14 @@ public class SSTableReader extends SSTable
deleteFiles &= !dfile.path.equals(replaces.dfile.path);
}
+ if (sharesBfWith != null)
+ {
+ closeBf &= sharesBfWith.bf != bf;
+ closeSummary &= sharesBfWith.indexSummary != indexSummary;
+ closeFiles &= sharesBfWith.dfile != dfile;
+ deleteFiles &= !dfile.path.equals(sharesBfWith.dfile.path);
+ }
+
boolean deleteAll = false;
if (release && isCompacted.get())
{
@@ -928,6 +937,19 @@ public class SSTableReader extends SSTable
}
}
+ /**
+ * this is used to avoid closing the bloom filter multiple times when finishing an SSTableRewriter
+ *
+ * note that the reason we don't use replacedBy is that we are not yet actually replaced
+ *
+ * @param newReader
+ */
+ public void sharesBfWith(SSTableReader newReader)
+ {
+ assert openReason.equals(OpenReason.EARLY);
+ this.sharesBfWith = newReader;
+ }
+
public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose)
{
synchronized (replaceLock)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1062929/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 4d5a06f..d187e9d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.io.sstable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -75,6 +76,7 @@ public class SSTableRewriter
private final ColumnFamilyStore cfs;
private final long maxAge;
+ private final List<SSTableReader> finished = new ArrayList<>();
private final Set<SSTableReader> rewriting; // the readers we are rewriting (updated as they are replaced)
private final Map<Descriptor, DecoratedKey> originalStarts = new HashMap<>(); // the start key for each reader we are rewriting
private final Map<Descriptor, Integer> fileDescriptors = new HashMap<>(); // the file descriptors for each reader descriptor we are rewriting
@@ -180,16 +182,11 @@ public class SSTableRewriter
public void abort()
{
- if (writer == null)
- return;
-
switchWriter(null);
moveStarts(null, Functions.forMap(originalStarts), true);
List<SSTableReader> close = Lists.newArrayList(finishedOpenedEarly);
- if (currentlyOpenedEarly != null)
- close.add(currentlyOpenedEarly);
for (Pair<SSTableWriter, SSTableReader> w : finishedWriters)
{
@@ -202,6 +199,12 @@ public class SSTableRewriter
for (SSTableReader sstable : close)
sstable.markObsolete();
+ for (SSTableReader sstable : finished)
+ {
+ sstable.markObsolete();
+ sstable.releaseReference();
+ }
+
// releases reference in replaceReaders
if (!isOffline)
{
@@ -210,6 +213,7 @@ public class SSTableRewriter
}
}
+
/**
* Replace the readers we are rewriting with cloneWithNewStart, reclaiming any page cache that is no longer
* needed, and transferring any key cache entries over to the new reader, expiring them from the old. if reset
@@ -327,38 +331,70 @@ public class SSTableRewriter
*/
public List<SSTableReader> finish(long repairedAt)
{
- List<SSTableReader> finished = new ArrayList<>();
- if (writer.getFilePointer() > 0)
- {
- SSTableReader reader = repairedAt < 0 ? writer.closeAndOpenReader(maxAge) : writer.closeAndOpenReader(maxAge, repairedAt);
- finished.add(reader);
- replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
- moveStarts(reader, Functions.constant(reader.last), false);
- }
- else
- {
- writer.abort(true);
- }
+ List<Pair<SSTableReader, SSTableReader>> toReplace = new ArrayList<>();
+ switchWriter(null);
// make real sstables of the written ones:
- for (Pair<SSTableWriter, SSTableReader> w : finishedWriters)
+ Iterator<Pair<SSTableWriter, SSTableReader>> it = finishedWriters.iterator();
+ while(it.hasNext())
{
+ Pair<SSTableWriter, SSTableReader> w = it.next();
if (w.left.getFilePointer() > 0)
{
SSTableReader newReader = repairedAt < 0 ? w.left.closeAndOpenReader(maxAge) : w.left.closeAndOpenReader(maxAge, repairedAt);
finished.add(newReader);
+
+ if (w.right != null)
+ w.right.sharesBfWith(newReader);
// w.right is the tmplink-reader we added when switching writer, replace with the real sstable.
- replaceEarlyOpenedFile(w.right, newReader);
+ toReplace.add(Pair.create(w.right, newReader));
}
else
{
assert w.right == null;
w.left.abort(true);
}
+ it.remove();
}
+
+ for (Pair<SSTableReader, SSTableReader> replace : toReplace)
+ replaceEarlyOpenedFile(replace.left, replace.right);
+
if (!isOffline)
{
dataTracker.unmarkCompacting(finished);
}
return finished;
}
+
+ @VisibleForTesting
+ void finishAndThrow(boolean early)
+ {
+ List<Pair<SSTableReader, SSTableReader>> toReplace = new ArrayList<>();
+ switchWriter(null);
+ if (early)
+ throw new RuntimeException("exception thrown early in finish");
+ // make real sstables of the written ones:
+ Iterator<Pair<SSTableWriter, SSTableReader>> it = finishedWriters.iterator();
+ while(it.hasNext())
+ {
+ Pair<SSTableWriter, SSTableReader> w = it.next();
+ if (w.left.getFilePointer() > 0)
+ {
+ SSTableReader newReader = w.left.closeAndOpenReader(maxAge);
+ finished.add(newReader);
+ if (w.right != null)
+ w.right.sharesBfWith(newReader);
+ // w.right is the tmplink-reader we added when switching writer, replace with the real sstable.
+ toReplace.add(Pair.create(w.right, newReader));
+ }
+ else
+ {
+ assert w.right == null;
+ w.left.abort(true);
+ }
+ it.remove();
+ }
+
+ throw new RuntimeException("exception thrown after all sstables finished");
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1062929/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 8a494a6..0a76b66 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -29,9 +30,11 @@ import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ArrayBackedSortedColumns;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
@@ -80,12 +83,45 @@ public class SSTableRewriterTest extends SchemaLoader
writer.append(row);
}
}
- cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, writer.finish(), OperationType.COMPACTION);
-
+ Collection<SSTableReader> newsstables = writer.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables , OperationType.COMPACTION);
+ Thread.sleep(100);
validateCFS(cfs);
+ int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(), 0, 0);
+ assertEquals(1, filecounts);
}
+ @Test
+ public void basicTest2() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+ Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
+ assertEquals(1, sstables.size());
+ SSTableRewriter.overrideOpenInterval(10000000);
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
+ try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
+ {
+ ICompactionScanner scanner = scanners.scanners.get(0);
+ CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
+ writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
+ while (scanner.hasNext())
+ {
+ AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next()));
+ writer.append(row);
+ }
+ }
+ Collection<SSTableReader> newsstables = writer.finish();
+ cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables, OperationType.COMPACTION);
+ Thread.sleep(100);
+ validateCFS(cfs);
+ int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(), 0, 0);
+ assertEquals(1, filecounts);
+ }
@Test
public void testFileRemoval() throws InterruptedException
@@ -114,37 +150,11 @@ public class SSTableRewriterTest extends SchemaLoader
assertFileCounts(dir.list(), 0, 3);
writer.abort(false);
Thread.sleep(1000);
- assertFileCounts(dir.list(), 0, 0);
- validateCFS(cfs);
- }
-
- @Test
- public void testFileRemovalNoAbort() throws InterruptedException
- {
- Keyspace keyspace = Keyspace.open(KEYSPACE);
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
- cfs.truncateBlocking();
- ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
- for (int i = 0; i < 1000; i++)
- cf.addColumn(Util.column(String.valueOf(i), "a", 1));
- File dir = cfs.directories.getDirectoryForNewSSTables();
- SSTableWriter writer = getWriter(cfs, dir);
-
- for (int i = 0; i < 500; i++)
- writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
- SSTableReader s = writer.openEarly(1000);
- //assertFileCounts(dir.list(), 2, 3);
- for (int i = 500; i < 1000; i++)
- writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
- writer.closeAndOpenReader();
- s.markObsolete();
- s.releaseReference();
- Thread.sleep(1000);
- assertFileCounts(dir.list(), 0, 0);
+ int datafiles = assertFileCounts(dir.list(), 0, 0);
+ assertEquals(datafiles, 0);
validateCFS(cfs);
}
-
@Test
public void testNumberOfFilesAndSizes() throws Exception
{
@@ -446,6 +456,95 @@ public class SSTableRewriterTest extends SchemaLoader
assertFileCounts(s.descriptor.directory.list(), 0, 0);
validateCFS(cfs);
}
+ @Test
+ public void testAbort() throws Exception
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+ Set<SSTableReader> compacting = Sets.newHashSet(s);
+ SSTableRewriter.overrideOpenInterval(10000000);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ SSTableWriter w = getWriter(cfs, s.descriptor.directory);
+ rewriter.switchWriter(w);
+ try (ICompactionScanner scanner = compacting.iterator().next().getScanner();
+ CompactionController controller = new CompactionController(cfs, compacting, 0))
+ {
+ while (scanner.hasNext())
+ {
+ rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ }
+ }
+ try
+ {
+ rewriter.finishAndThrow(false);
+ }
+ catch (Throwable t)
+ {
+ rewriter.abort();
+ }
+ }
+ Thread.sleep(1000);
+ int filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0);
+ assertEquals(filecount, 1);
+ assertEquals(1, cfs.getSSTables().size());
+ validateCFS(cfs);
+ cfs.truncateBlocking();
+ Thread.sleep(1000);
+ filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0);
+ assertEquals(0, filecount);
+
+ }
+
+ @Test
+ public void testAbort2() throws Exception
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+ SSTableReader s = writeFile(cfs, 1000);
+ cfs.addSSTable(s);
+ Set<SSTableReader> compacting = Sets.newHashSet(s);
+ SSTableRewriter.overrideOpenInterval(10000000);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ SSTableWriter w = getWriter(cfs, s.descriptor.directory);
+ rewriter.switchWriter(w);
+ try (ICompactionScanner scanner = compacting.iterator().next().getScanner();
+ CompactionController controller = new CompactionController(cfs, compacting, 0))
+ {
+ while (scanner.hasNext())
+ {
+ rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+ if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+ {
+ rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+ }
+ }
+ try
+ {
+ rewriter.finishAndThrow(true);
+ }
+ catch (Throwable t)
+ {
+ rewriter.abort();
+ }
+ }
+ Thread.sleep(1000);
+ int filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0);
+ assertEquals(filecount, 1);
+ assertEquals(1, cfs.getSSTables().size());
+ validateCFS(cfs);
+ cfs.truncateBlocking();
+ Thread.sleep(1000);
+ filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0);
+ assertEquals(0, filecount);
+
+ }
private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
{
@@ -469,28 +568,45 @@ public class SSTableRewriterTest extends SchemaLoader
private void validateCFS(ColumnFamilyStore cfs)
{
+ Set<Integer> liveDescriptors = new HashSet<>();
for (SSTableReader sstable : cfs.getSSTables())
{
assertFalse(sstable.isMarkedCompacted());
assertEquals(1, sstable.referenceCount());
+ liveDescriptors.add(sstable.descriptor.generation);
+ }
+ for (File dir : cfs.directories.getCFDirectories())
+ {
+ for (String f : dir.list())
+ {
+ if (f.contains("Data"))
+ {
+ Descriptor d = Descriptor.fromFilename(f);
+ assertTrue(d.toString(), liveDescriptors.contains(d.generation));
+ }
+ }
}
assertTrue(cfs.getDataTracker().getCompacting().isEmpty());
}
- private void assertFileCounts(String [] files, int expectedtmplinkCount, int expectedtmpCount)
+ private int assertFileCounts(String [] files, int expectedtmplinkCount, int expectedtmpCount)
{
int tmplinkcount = 0;
int tmpcount = 0;
+ int datacount = 0;
for (String f : files)
{
if (f.contains("-tmplink-"))
tmplinkcount++;
- if (f.contains("-tmp-"))
+ else if (f.contains("-tmp-"))
tmpcount++;
+ else if (f.contains("Data"))
+ datacount++;
}
assertEquals(expectedtmplinkCount, tmplinkcount);
assertEquals(expectedtmpCount, tmpcount);
+ return datacount;
}
private SSTableWriter getWriter(ColumnFamilyStore cfs, File directory)