You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/12/28 18:02:22 UTC
[2/3] git commit: make sure we close the last sstablewriter when rows
are dropped patch by yukim and jbellis for CASSANDRA-5077
make sure we close the last sstablewriter when rows are dropped
patch by yukim and jbellis for CASSANDRA-5077
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1d96e32b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1d96e32b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1d96e32b
Branch: refs/heads/trunk
Commit: 1d96e32b871efaa484246f2a4b6b195648302688
Parents: d6a3845
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Dec 28 12:02:10 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Dec 28 12:02:10 2012 -0500
----------------------------------------------------------------------
.../cassandra/db/compaction/CompactionTask.java | 30 ++++++++++-----
1 files changed, 20 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d96e32b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index e4e15bc..7168280 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -169,6 +169,7 @@ public class CompactionTask extends AbstractCompactionTask
row.close();
continue;
}
+
// If the row is cached, we call removeDeleted on at read time it to have coherent query returns,
// but if the row is not pushed out of the cache, obsolete tombstones will persist indefinitely.
controller.removeDeletedInCache(row.key);
@@ -187,19 +188,28 @@ public class CompactionTask extends AbstractCompactionTask
}
}
}
- if (!iter.hasNext() || newSSTableSegmentThresholdReached(writer))
+
+ if (newSSTableSegmentThresholdReached(writer))
{
- SSTableReader toIndex = writer.closeAndOpenReader(getMaxDataAge(toCompact));
- cachedKeyMap.put(toIndex, cachedKeys);
- sstables.add(toIndex);
- if (iter.hasNext())
- {
- writer = cfs.createCompactionWriter(keysPerSSTable, cfs.directories.getLocationForDisk(dataDirectory), toCompact);
- writers.add(writer);
- cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
- }
+ SSTableReader sstable = writer.closeAndOpenReader(getMaxDataAge(toCompact));
+ cachedKeyMap.put(sstable, cachedKeys);
+ sstables.add(sstable);
+ writer = cfs.createCompactionWriter(keysPerSSTable, cfs.directories.getLocationForDisk(dataDirectory), toCompact);
+ writers.add(writer);
+ cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
}
}
+
+ if (writer.getFilePointer() > 0)
+ {
+ SSTableReader sstable = writer.closeAndOpenReader(getMaxDataAge(toCompact));
+ cachedKeyMap.put(sstable, cachedKeys);
+ sstables.add(sstable);
+ }
+ else
+ {
+ writer.abort();
+ }
}
catch (Throwable t)
{