You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/12/19 23:55:54 UTC
[2/3] git commit: simplify CompactionIterable.getReduced patch by
jbellis; reviewed by Carl Yeksigian for CASSANDRA-5077
simplify CompactionIterable.getReduced
patch by jbellis; reviewed by Carl Yeksigian for CASSANDRA-5077
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4e3d0491
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4e3d0491
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4e3d0491
Branch: refs/heads/trunk
Commit: 4e3d0491514225980972a2253330acf255687acd
Parents: f252842
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Dec 19 16:52:18 2012 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Dec 19 16:54:22 2012 -0600
----------------------------------------------------------------------
.../db/compaction/CompactionIterable.java | 15 +++-------
.../cassandra/db/compaction/CompactionTask.java | 15 ++++++----
.../db/compaction/ParallelCompactionIterable.java | 22 +--------------
.../cassandra/db/compaction/PrecompactedRow.java | 2 -
4 files changed, 14 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e3d0491/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
index 9f4f7a0..32b4942 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
@@ -70,17 +70,10 @@ public class CompactionIterable extends AbstractCompactionIterable
CompactionIterable.this.updateCounterFor(rows.size());
try
{
- AbstractCompactedRow compactedRow = controller.getCompactedRow(new ArrayList<SSTableIdentityIterator>(rows));
- if (compactedRow.isEmpty())
- {
- controller.invalidateCachedRow(compactedRow.key);
- return null;
- }
-
- // If the row is cached, we call removeDeleted on at read time it to have coherent query returns,
- // but if the row is not pushed out of the cache, obsolete tombstones will persist indefinitely.
- controller.removeDeletedInCache(compactedRow.key);
- return compactedRow;
+ // create a new container for rows, since we're going to clear ours for the next one,
+ // and the AbstractCompactionRow code should be able to assume that the collection it receives
+ // won't be pulled out from under it.
+ return controller.getCompactedRow(new ArrayList<SSTableIdentityIterator>(rows));
}
finally
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e3d0491/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 481b7f4..3f6901a 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -129,7 +129,6 @@ public class CompactionTask extends AbstractCompactionTask
? new ParallelCompactionIterable(compactionType, strategy.getScanners(toCompact), controller)
: new CompactionIterable(compactionType, strategy.getScanners(toCompact), controller);
CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
- Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull());
Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
// we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
@@ -143,7 +142,7 @@ public class CompactionTask extends AbstractCompactionTask
collector.beginCompaction(ci);
try
{
- if (!nni.hasNext())
+ if (!iter.hasNext())
{
// don't mark compacted in the finally block, since if there _is_ nondeleted data,
// we need to sync it (via closeAndOpen) first, so there is no period during which
@@ -154,17 +153,21 @@ public class CompactionTask extends AbstractCompactionTask
SSTableWriter writer = cfs.createCompactionWriter(keysPerSSTable, cfs.directories.getLocationForDisk(dataDirectory), toCompact);
writers.add(writer);
- while (nni.hasNext())
+ while (iter.hasNext())
{
if (ci.isStopRequested())
throw new CompactionInterruptedException(ci.getCompactionInfo());
- AbstractCompactedRow row = nni.next();
+ AbstractCompactedRow row = iter.next();
if (row.isEmpty())
{
+ controller.invalidateCachedRow(row.key);
row.close();
continue;
}
+ // If the row is cached, we call removeDeleted on at read time it to have coherent query returns,
+ // but if the row is not pushed out of the cache, obsolete tombstones will persist indefinitely.
+ controller.removeDeletedInCache(row.key);
RowIndexEntry indexEntry = writer.append(row);
totalkeysWritten++;
@@ -180,12 +183,12 @@ public class CompactionTask extends AbstractCompactionTask
}
}
}
- if (!nni.hasNext() || newSSTableSegmentThresholdReached(writer))
+ if (!iter.hasNext() || newSSTableSegmentThresholdReached(writer))
{
SSTableReader toIndex = writer.closeAndOpenReader(getMaxDataAge(toCompact));
cachedKeyMap.put(toIndex, cachedKeys);
sstables.add(toIndex);
- if (nni.hasNext())
+ if (iter.hasNext())
{
writer = cfs.createCompactionWriter(keysPerSSTable, cfs.directories.getLocationForDisk(dataDirectory), toCompact);
writers.add(writer);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e3d0491/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index eaf35f2..7e1983c 100644
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -109,27 +109,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
throw new RuntimeException(e);
}
- if (compactedRow.isEmpty())
- {
- controller.invalidateCachedRow(compactedRow.key);
- try
- {
- compactedRow.close();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- return null;
- }
- else
- {
- // If the raw is cached, we call removeDeleted on it to have/ coherent query returns. However it would look
- // like some deleted columns lived longer than gc_grace + compaction. This can also free up big amount of
- // memory on long running instances
- controller.invalidateCachedRow(compactedRow.key);
- return compactedRow;
- }
+ return compactedRow;
}
public void close() throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e3d0491/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
index 6fc2f17..2d7f55a 100644
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
@@ -46,8 +46,6 @@ public class PrecompactedRow extends AbstractCompactedRow
compactedCf = cf;
}
-
-
public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, CompactionController controller, ColumnFamily cf)
{
assert key != null;