You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/12/19 23:55:54 UTC

[3/3] git commit: simplify CompactionIterable.getReduced patch by jbellis; reviewed by Carl Yeksigian for CASSANDRA-5077

simplify CompactionIterable.getReduced
patch by jbellis; reviewed by Carl Yeksigian for CASSANDRA-5077


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4e3d0491
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4e3d0491
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4e3d0491

Branch: refs/heads/cassandra-1.2
Commit: 4e3d0491514225980972a2253330acf255687acd
Parents: f252842
Author: Jonathan Ellis <jb...@apache.org>
Authored: Wed Dec 19 16:52:18 2012 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Dec 19 16:54:22 2012 -0600

----------------------------------------------------------------------
 .../db/compaction/CompactionIterable.java          |   15 +++-------
 .../cassandra/db/compaction/CompactionTask.java    |   15 ++++++----
 .../db/compaction/ParallelCompactionIterable.java  |   22 +--------------
 .../cassandra/db/compaction/PrecompactedRow.java   |    2 -
 4 files changed, 14 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e3d0491/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
index 9f4f7a0..32b4942 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
@@ -70,17 +70,10 @@ public class CompactionIterable extends AbstractCompactionIterable
             CompactionIterable.this.updateCounterFor(rows.size());
             try
             {
-                AbstractCompactedRow compactedRow = controller.getCompactedRow(new ArrayList<SSTableIdentityIterator>(rows));
-                if (compactedRow.isEmpty())
-                {
-                    controller.invalidateCachedRow(compactedRow.key);
-                    return null;
-                }
-
-                // If the row is cached, we call removeDeleted on at read time it to have coherent query returns,
-                // but if the row is not pushed out of the cache, obsolete tombstones will persist indefinitely.
-                controller.removeDeletedInCache(compactedRow.key);
-                return compactedRow;
+                // create a new container for rows, since we're going to clear ours for the next one,
+                // and the AbstractCompactionRow code should be able to assume that the collection it receives
+                // won't be pulled out from under it.
+                return controller.getCompactedRow(new ArrayList<SSTableIdentityIterator>(rows));
             }
             finally
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e3d0491/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 481b7f4..3f6901a 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -129,7 +129,6 @@ public class CompactionTask extends AbstractCompactionTask
                                       ? new ParallelCompactionIterable(compactionType, strategy.getScanners(toCompact), controller)
                                       : new CompactionIterable(compactionType, strategy.getScanners(toCompact), controller);
         CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
-        Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull());
         Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
 
         // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
@@ -143,7 +142,7 @@ public class CompactionTask extends AbstractCompactionTask
             collector.beginCompaction(ci);
         try
         {
-            if (!nni.hasNext())
+            if (!iter.hasNext())
             {
                 // don't mark compacted in the finally block, since if there _is_ nondeleted data,
                 // we need to sync it (via closeAndOpen) first, so there is no period during which
@@ -154,17 +153,21 @@ public class CompactionTask extends AbstractCompactionTask
 
             SSTableWriter writer = cfs.createCompactionWriter(keysPerSSTable, cfs.directories.getLocationForDisk(dataDirectory), toCompact);
             writers.add(writer);
-            while (nni.hasNext())
+            while (iter.hasNext())
             {
                 if (ci.isStopRequested())
                     throw new CompactionInterruptedException(ci.getCompactionInfo());
 
-                AbstractCompactedRow row = nni.next();
+                AbstractCompactedRow row = iter.next();
                 if (row.isEmpty())
                 {
+                    controller.invalidateCachedRow(row.key);
                     row.close();
                     continue;
                 }
+                // If the row is cached, we call removeDeleted on at read time it to have coherent query returns,
+                // but if the row is not pushed out of the cache, obsolete tombstones will persist indefinitely.
+                controller.removeDeletedInCache(row.key);
 
                 RowIndexEntry indexEntry = writer.append(row);
                 totalkeysWritten++;
@@ -180,12 +183,12 @@ public class CompactionTask extends AbstractCompactionTask
                         }
                     }
                 }
-                if (!nni.hasNext() || newSSTableSegmentThresholdReached(writer))
+                if (!iter.hasNext() || newSSTableSegmentThresholdReached(writer))
                 {
                     SSTableReader toIndex = writer.closeAndOpenReader(getMaxDataAge(toCompact));
                     cachedKeyMap.put(toIndex, cachedKeys);
                     sstables.add(toIndex);
-                    if (nni.hasNext())
+                    if (iter.hasNext())
                     {
                         writer = cfs.createCompactionWriter(keysPerSSTable, cfs.directories.getLocationForDisk(dataDirectory), toCompact);
                         writers.add(writer);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e3d0491/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index eaf35f2..7e1983c 100644
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -109,27 +109,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
                 throw new RuntimeException(e);
             }
 
-            if (compactedRow.isEmpty())
-            {
-                controller.invalidateCachedRow(compactedRow.key);
-                try
-                {
-                    compactedRow.close();
-                }
-                catch (IOException e)
-                {
-                    throw new RuntimeException(e);
-                }
-                return null;
-            }
-            else
-            {
-                // If the raw is cached, we call removeDeleted on it to have/ coherent query returns. However it would look
-                // like some deleted columns lived longer than gc_grace + compaction. This can also free up big amount of
-                // memory on long running instances
-                controller.invalidateCachedRow(compactedRow.key);
-                return compactedRow;
-            }
+            return compactedRow;
         }
 
         public void close() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e3d0491/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
index 6fc2f17..2d7f55a 100644
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
@@ -46,8 +46,6 @@ public class PrecompactedRow extends AbstractCompactedRow
         compactedCf = cf;
     }
 
-
-
     public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, CompactionController controller, ColumnFamily cf)
     {
         assert key != null;