You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2012/10/17 01:08:31 UTC

[3/3] git commit: fix wrong leveled compaction progress calculation; patch by yukim reviewed by jbellis for CASSANDRA-4807

fix wrong leveled compaction progress calculation; patch by yukim reviewed by jbellis for CASSANDRA-4807


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b8874ad1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b8874ad1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b8874ad1

Branch: refs/heads/cassandra-1.1
Commit: b8874ad1a892cfc45c87ed1187d43df356f48315
Parents: e1491f3
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Oct 16 16:23:46 2012 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Oct 16 16:23:46 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    3 +
 .../db/compaction/LeveledCompactionStrategy.java   |    8 +++-
 .../compaction/LeveledCompactionStrategyTest.java  |   41 +++++++++++++++
 3 files changed, 51 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b8874ad1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1a37b8e..f243c74 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,6 @@
+1.1.7
+ * fix wrong leveled compaction progress calculation (CASSANDRA-4807)
+
 1.1.6
  * Wait for writes on synchronous read digest mismatch (CASSANDRA-4792)
  * fix commitlog replay for nanotime-infected sstables (CASSANDRA-4782)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b8874ad1/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 9fb6a89..71d8871 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.collect.*;
 import org.slf4j.Logger;
@@ -47,7 +48,8 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
 {
     private static final Logger logger = LoggerFactory.getLogger(LeveledCompactionStrategy.class);
 
-    private final LeveledManifest manifest;
+    @VisibleForTesting
+    final LeveledManifest manifest;
     private final String SSTABLE_SIZE_OPTION = "sstable_size_in_mb";
     private final int maxSSTableSizeInMB;
     private final AtomicReference<LeveledCompactionTask> task = new AtomicReference<LeveledCompactionTask>();
@@ -237,7 +239,11 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
                     positionOffset += currentScanner.getLengthInBytes();
                     currentScanner.close();
                     if (!sstableIterator.hasNext())
+                    {
+                        // reset to null so getCurrentPosition does not return wrong value
+                        currentScanner = null;
                         return endOfData();
+                    }
                     currentScanner = sstableIterator.next().getDirectScanner(range);
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b8874ad1/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 051aff8..c372c8f 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -30,6 +30,8 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.service.AntiEntropyService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -84,4 +86,43 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
         AntiEntropyService.Validator validator = new AntiEntropyService.Validator(req);
         CompactionManager.instance.submitValidation(store, validator).get();
     }
+
+    @Test
+    public void testCompactionProgress() throws Exception
+    {
+        String ksname = "Keyspace1";
+        String cfname = "StandardLeveled";
+        Table table = Table.open(ksname);
+        ColumnFamilyStore store = table.getColumnFamilyStore(cfname);
+
+        // make sure we have SSTables in L1
+        ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]);
+        int rows = 2;
+        int columns = 10;
+        for (int r = 0; r < rows; r++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(r));
+            RowMutation rm = new RowMutation(ksname, key.key);
+            for (int c = 0; c < columns; c++)
+            {
+                rm.add(new QueryPath(cfname, null, ByteBufferUtil.bytes("column" + c)), value, 0);
+            }
+            rm.apply();
+            store.forceBlockingFlush();
+        }
+        store.forceMajorCompaction();
+
+        LeveledCompactionStrategy strat = (LeveledCompactionStrategy)store.getCompactionStrategy();
+        assert strat.getLevelSize(1) > 0;
+
+        // get LeveledScanner for level 1 sstables
+        Collection<SSTableReader> sstables = strat.manifest.getLevel(1);
+        ICompactionScanner scanner = strat.getScanners(sstables).get(0);
+        // scan through to the end
+        while (scanner.hasNext())
+            scanner.next();
+
+        // scanner.getCurrentPosition should be equal to total bytes of L1 sstables
+        assert scanner.getCurrentPosition() == SSTable.getTotalBytes(sstables);
+    }
 }