You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2013/01/11 19:56:59 UTC
[1/6] git commit: better handling for amid compaction failure;
patch by yukim reviewed by slebresne for CASSANDRA-5137
better handling for amid compaction failure; patch by yukim reviewed by slebresne for CASSANDRA-5137
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3cc8656f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3cc8656f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3cc8656f
Branch: refs/heads/cassandra-1.1
Commit: 3cc8656f8fbb67c7e665fe27642076ae0109c2b5
Parents: 1cbbba0
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Jan 11 12:32:59 2013 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Jan 11 12:32:59 2013 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnFamilyStore.java | 35 ++++++++++-----
.../cassandra/db/compaction/CompactionTask.java | 28 +++++++-----
3 files changed, 42 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cc8656f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 82f503c..6c76151 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@
* fix user defined compaction to run against 1.1 data directory (CASSANDRA-5118)
* Fix CQL3 BATCH authorization caching (CASSANDRA-5145)
* fix get_count returns incorrect value with TTL (CASSANDRA-5099)
+ * better handling for amid compaction failure (CASSANDRA-5137)
1.1.8
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cc8656f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 8284d38..2781800 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -244,20 +244,33 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
Directories.SSTableLister sstableFiles = directories.sstableLister().skipCompacted(true).skipTemporary(true);
Collection<SSTableReader> sstables = SSTableReader.batchOpen(sstableFiles.list().entrySet(), savedKeys, data, metadata, this.partitioner);
- // Filter non-compacted sstables, remove compacted ones
- Set<Integer> compactedSSTables = new HashSet<Integer>();
- for (SSTableReader sstable : sstables)
- compactedSSTables.addAll(sstable.getAncestors());
+ if (metadata.getDefaultValidator().isCommutative())
+ {
+ // Filter non-compacted sstables, remove compacted ones
+ Set<Integer> compactedSSTables = new HashSet<Integer>();
+ for (SSTableReader sstable : sstables)
+ compactedSSTables.addAll(sstable.getAncestors());
- Set<SSTableReader> liveSSTables = new HashSet<SSTableReader>();
- for (SSTableReader sstable : sstables)
+ Set<SSTableReader> liveSSTables = new HashSet<SSTableReader>();
+ for (SSTableReader sstable : sstables)
+ {
+ if (compactedSSTables.contains(sstable.descriptor.generation))
+ {
+ logger.info("{} is already compacted and will be removed.", sstable);
+ sstable.markCompacted(); // we need to mark as compacted to be deleted
+ sstable.releaseReference(); // this amount to deleting the sstable
+ }
+ else
+ {
+ liveSSTables.add(sstable);
+ }
+ }
+ data.addInitialSSTables(liveSSTables);
+ }
+ else
{
- if (compactedSSTables.contains(sstable.descriptor.generation))
- sstable.releaseReference(); // this amount to deleting the sstable
- else
- liveSSTables.add(sstable);
+ data.addInitialSSTables(sstables);
}
- data.addInitialSSTables(liveSSTables);
}
// compaction strategy should be created after the CFS has been prepared
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cc8656f/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index b252bc5..714e308 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -32,9 +32,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
-import org.apache.cassandra.io.sstable.SSTable;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.FBUtilities;
@@ -127,7 +125,7 @@ public class CompactionTask extends AbstractCompactionTask
// we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
// replace the old entries. Track entries to preheat here until then.
- Map<SSTableReader, Map<DecoratedKey, Long>> cachedKeyMap = new HashMap<SSTableReader, Map<DecoratedKey, Long>>();
+ Map<Descriptor, Map<DecoratedKey, Long>> cachedKeyMap = new HashMap<Descriptor, Map<DecoratedKey, Long>>();
Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>();
@@ -175,9 +173,8 @@ public class CompactionTask extends AbstractCompactionTask
}
if (!nni.hasNext() || newSSTableSegmentThresholdReached(writer))
{
- SSTableReader toIndex = writer.closeAndOpenReader(getMaxDataAge(toCompact));
- cachedKeyMap.put(toIndex, cachedKeys);
- sstables.add(toIndex);
+ // tmp = false because later we want to query it with descriptor from SSTableReader
+ cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
if (nni.hasNext())
{
writer = cfs.createCompactionWriter(keysPerSSTable, compactionFileLocation, toCompact);
@@ -186,11 +183,21 @@ public class CompactionTask extends AbstractCompactionTask
}
}
}
+
+ long maxAge = getMaxDataAge(toCompact);
+ for (SSTableWriter completedWriter : writers)
+ sstables.add(completedWriter.closeAndOpenReader(maxAge));
}
catch (Exception e)
{
for (SSTableWriter writer : writers)
writer.abort();
+ // also remove already completed SSTables
+ for (SSTableReader sstable : sstables)
+ {
+ sstable.markCompacted();
+ sstable.releaseReference();
+ }
throw FBUtilities.unchecked(e);
}
finally
@@ -202,11 +209,10 @@ public class CompactionTask extends AbstractCompactionTask
cfs.replaceCompactedSSTables(toCompact, sstables, compactionType);
// TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
- for (Map.Entry<SSTableReader, Map<DecoratedKey, Long>> ssTableReaderMapEntry : cachedKeyMap.entrySet())
+ for (SSTableReader sstable : sstables)
{
- SSTableReader key = ssTableReaderMapEntry.getKey();
- for (Map.Entry<DecoratedKey, Long> entry : ssTableReaderMapEntry.getValue().entrySet())
- key.cacheKey(entry.getKey(), entry.getValue());
+ for (Map.Entry<DecoratedKey, Long> entry : cachedKeyMap.get(sstable.descriptor).entrySet())
+ sstable.cacheKey(entry.getKey(), entry.getValue());
}
long dTime = System.currentTimeMillis() - startTime;