You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/05/02 19:48:48 UTC

[2/4] git commit: Open one sstableScanner per level for leveled compaction patch by slebresne and jbellis for CASSANDRA-4142

Open one sstableScanner per level for leveled compaction
patch by slebresne and jbellis for CASSANDRA-4142


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/46e422a9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/46e422a9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/46e422a9

Branch: refs/heads/cassandra-1.1
Commit: 46e422a9417b5b513ceae4c9652ba413e2ede474
Parents: 1686a36
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Apr 27 16:19:24 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed May 2 12:48:23 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../db/compaction/AbstractCompactionIterable.java  |   17 +--
 .../db/compaction/AbstractCompactionStrategy.java  |   23 +++-
 .../db/compaction/CompactionIterable.java          |   20 +---
 .../cassandra/db/compaction/CompactionManager.java |   16 +--
 .../cassandra/db/compaction/CompactionTask.java    |    7 +-
 .../db/compaction/ICompactionScanner.java          |   34 +++++
 .../db/compaction/LeveledCompactionStrategy.java   |  103 ++++++++++++++-
 .../cassandra/db/compaction/LeveledManifest.java   |    3 +-
 .../db/compaction/ParallelCompactionIterable.java  |   24 ++--
 .../apache/cassandra/io/sstable/SSTableReader.java |    2 +
 .../cassandra/io/sstable/SSTableScanner.java       |   13 ++-
 .../cassandra/service/AntiEntropyService.java      |    4 +-
 test/conf/cassandra.yaml                           |    1 +
 test/unit/org/apache/cassandra/SchemaLoader.java   |    9 +-
 .../compaction/LeveledCompactionStrategyTest.java  |   87 ++++++++++++
 .../cassandra/io/LazilyCompactedRowTest.java       |   14 +-
 17 files changed, 300 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 62234b2..83c171b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.1.1-dev
+ * Open 1 sstableScanner per level for leveled compaction (CASSANDRA-4142)
  * Optimize reads when row deletion timestamps allow us to restrict
    the set of sstables we check (CASSANDRA-4116)
  * incremental repair by token range (CASSANDRA-3912)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
index 95e6590..8976f4e 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
@@ -29,7 +29,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableScanner;
 import org.apache.cassandra.utils.CloseableIterator;
 
 public abstract class AbstractCompactionIterable extends CompactionInfo.Holder implements Iterable<AbstractCompactedRow>
@@ -40,9 +39,9 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i
     protected final CompactionController controller;
     protected final long totalBytes;
     protected volatile long bytesRead;
-    protected final List<SSTableScanner> scanners;
+    protected final List<ICompactionScanner> scanners;
 
-    public AbstractCompactionIterable(CompactionController controller, OperationType type, List<SSTableScanner> scanners)
+    public AbstractCompactionIterable(CompactionController controller, OperationType type, List<ICompactionScanner> scanners)
     {
         this.controller = controller;
         this.type = type;
@@ -50,19 +49,11 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i
         this.bytesRead = 0;
 
         long bytes = 0;
-        for (SSTableScanner scanner : scanners)
-            bytes += scanner.getFileLength();
+        for (ICompactionScanner scanner : scanners)
+            bytes += scanner.getLengthInBytes();
         this.totalBytes = bytes;
     }
 
-    protected static List<SSTableScanner> getScanners(Iterable<SSTableReader> sstables) throws IOException
-    {
-        ArrayList<SSTableScanner> scanners = new ArrayList<SSTableScanner>();
-        for (SSTableReader sstable : sstables)
-            scanners.add(sstable.getDirectScanner());
-        return scanners;
-    }
-
     public CompactionInfo getCompactionInfo()
     {
         return new CompactionInfo(this.hashCode(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 2e70c46..3f51b88 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -18,15 +18,17 @@
 
 package org.apache.cassandra.db.compaction;
 
+import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.service.StorageService;
 
-
 /**
  * Pluggable compaction strategy determines how SSTables get merged.
  *
@@ -126,4 +128,23 @@ public abstract class AbstractCompactionStrategy
 
         return filteredCandidates;
     }
+
+    /**
+     * Returns a list of KeyScanners given sstables and a range on which to scan.
+     * The default implementation simply grab one SSTableScanner per-sstable, but overriding this method
+     * allow for a more memory efficient solution if we know the sstable don't overlap (see
+     * LeveledCompactionStrategy for instance).
+     */
+    public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables, Range<Token> range) throws IOException
+    {
+        ArrayList<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>();
+        for (SSTableReader sstable : sstables)
+            scanners.add(sstable.getDirectScanner(range));
+        return scanners;
+    }
+
+    public List<ICompactionScanner> getScanners(Collection<SSTableReader> toCompact) throws IOException
+    {
+        return getScanners(toCompact, null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
index 8a4aa0e..0ead533 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.columniterator.IColumnIterator;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableScanner;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.MergeIterator;
 
@@ -50,25 +49,12 @@ public class CompactionIterable extends AbstractCompactionIterable
         }
     };
 
-    public CompactionIterable(OperationType type, Iterable<SSTableReader> sstables, CompactionController controller) throws IOException
-    {
-        this(type, getScanners(sstables), controller);
-    }
-
-    protected CompactionIterable(OperationType type, List<SSTableScanner> scanners, CompactionController controller)
+    public CompactionIterable(OperationType type, List<ICompactionScanner> scanners, CompactionController controller)
     {
         super(controller, type, scanners);
         row = 0;
     }
 
-    protected static List<SSTableScanner> getScanners(Iterable<SSTableReader> sstables) throws IOException
-    {
-        ArrayList<SSTableScanner> scanners = new ArrayList<SSTableScanner>();
-        for (SSTableReader sstable : sstables)
-            scanners.add(sstable.getDirectScanner());
-        return scanners;
-    }
-
     public CloseableIterator<AbstractCompactedRow> iterator()
     {
         return MergeIterator.get(scanners, comparator, new Reducer());
@@ -116,8 +102,8 @@ public class CompactionIterable extends AbstractCompactionIterable
                 if ((row++ % 1000) == 0)
                 {
                     long n = 0;
-                    for (SSTableScanner scanner : scanners)
-                        n += scanner.getFilePointer();
+                    for (ICompactionScanner scanner : scanners)
+                        n += scanner.getCurrentPosition();
                     bytesRead = n;
                     controller.mayThrottle(bytesRead);
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 0ecac5c..fd14593 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -760,7 +760,7 @@ public class CompactionManager implements CompactionManagerMBean
                         }
                     }
                     if ((rowsRead++ % 1000) == 0)
-                        controller.mayThrottle(scanner.getFilePointer());
+                        controller.mayThrottle(scanner.getCurrentPosition());
                 }
                 if (writer != null)
                     newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
@@ -986,17 +986,9 @@ public class CompactionManager implements CompactionManagerMBean
         public ValidationCompactionIterable(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, Range<Token> range) throws IOException
         {
             super(OperationType.VALIDATION,
-                  getScanners(sstables, range),
+                  cfs.getCompactionStrategy().getScanners(sstables, range),
                   new CompactionController(cfs, sstables, getDefaultGcBefore(cfs), true));
         }
-
-        protected static List<SSTableScanner> getScanners(Iterable<SSTableReader> sstables, Range<Token> range) throws IOException
-        {
-            ArrayList<SSTableScanner> scanners = new ArrayList<SSTableScanner>();
-            for (SSTableReader sstable : sstables)
-                scanners.add(sstable.getDirectScanner(range));
-            return scanners;
-        }
     }
 
     public int getActiveCompactions()
@@ -1196,8 +1188,8 @@ public class CompactionManager implements CompactionManagerMBean
                                           sstable.descriptor.ksname,
                                           sstable.descriptor.cfname,
                                           OperationType.CLEANUP,
-                                          scanner.getFilePointer(),
-                                          scanner.getFileLength());
+                                          scanner.getCurrentPosition(),
+                                          scanner.getLengthInBytes());
             }
             catch (Exception e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index e93725c..b66f20b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -116,15 +116,16 @@ public class CompactionTask extends AbstractCompactionTask
         long startTime = System.currentTimeMillis();
         long totalkeysWritten = 0;
 
+        AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
         long estimatedTotalKeys = Math.max(DatabaseDescriptor.getIndexInterval(), SSTableReader.getApproximateKeyCount(toCompact));
-        long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(toCompact) / cfs.getCompactionStrategy().getMaxSSTableSize());
+        long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(toCompact) / strategy.getMaxSSTableSize());
         long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
         if (logger.isDebugEnabled())
             logger.debug("Expected bloom filter size : " + keysPerSSTable);
 
         AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
-                                      ? new ParallelCompactionIterable(compactionType, toCompact, controller)
-                                      : new CompactionIterable(compactionType, toCompact, controller);
+                                      ? new ParallelCompactionIterable(compactionType, strategy.getScanners(toCompact), controller)
+                                      : new CompactionIterable(compactionType, strategy.getScanners(toCompact), controller);
         CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
         Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull());
         Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/ICompactionScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ICompactionScanner.java b/src/java/org/apache/cassandra/db/compaction/ICompactionScanner.java
new file mode 100644
index 0000000..0b1a263
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/ICompactionScanner.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.utils.CloseableIterator;
+
+/**
+ * An ICompactionScanner is an abstraction allowing multiple SSTableScanners to be
+ * chained together under the hood.  See LeveledCompactionStrategy.getScanners.
+ */
+public interface ICompactionScanner extends CloseableIterator<IColumnIterator>
+{
+    public long getLengthInBytes();
+    public long getCurrentPosition();
+    public String getBackingFiles();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index d224e5e..1bc40fd 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -21,17 +21,22 @@ package org.apache.cassandra.db.compaction;
  */
 
 
+import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicReference;
 
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
+import com.google.common.base.Joiner;
+import com.google.common.collect.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableScanner;
 import org.apache.cassandra.notifications.INotification;
 import org.apache.cassandra.notifications.INotificationConsumer;
 import org.apache.cassandra.notifications.SSTableAddedNotification;
@@ -162,6 +167,100 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
         return Sets.difference(L0, sstablesToIgnore).size() + manifest.getLevelCount() > 20;
     }
 
+    public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables, Range<Token> range) throws IOException
+    {
+        Multimap<Integer, SSTableReader> byLevel = ArrayListMultimap.create();
+        for (SSTableReader sstable : sstables)
+            byLevel.get(manifest.levelOf(sstable)).add(sstable);
+
+        List<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>(sstables.size());
+        for (Integer level : ImmutableSortedSet.copyOf(byLevel.keySet()))
+            scanners.add(new LeveledScanner(new ArrayList<SSTableReader>(byLevel.get(level)), range));
+
+        return scanners;
+    }
+
+    // Lazily creates SSTableBoundedScanner for sstable that are assumed to be from the
+    // same level (e.g. non overlapping) - see #4142
+    private static class LeveledScanner extends AbstractIterator<IColumnIterator> implements ICompactionScanner
+    {
+        private final Range<Token> range;
+        private final List<SSTableReader> sstables;
+        private final Iterator<SSTableReader> sstableIterator;
+        private final long totalLength;
+
+        private SSTableScanner currentScanner;
+        private long positionOffset;
+
+        public LeveledScanner(List<SSTableReader> sstables, Range<Token> range)
+        {
+            this.range = range;
+            this.sstables = sstables;
+
+            // Sorting a list we got in argument is bad but it's all private to this class so let's not bother
+            Collections.sort(sstables, SSTable.sstableComparator);
+            this.sstableIterator = sstables.iterator();
+
+            long length = 0;
+            for (SSTableReader sstable : sstables)
+                length += sstable.uncompressedLength();
+            totalLength = length;
+        }
+
+        protected IColumnIterator computeNext()
+        {
+            try
+            {
+                if (currentScanner != null)
+                {
+                    if (currentScanner.hasNext())
+                    {
+                        return currentScanner.next();
+                    }
+                    else
+                    {
+                        positionOffset += currentScanner.getLengthInBytes();
+                        currentScanner.close();
+                        currentScanner = null;
+                        return computeNext();
+                    }
+                }
+
+                if (!sstableIterator.hasNext())
+                    return endOfData();
+
+                SSTableReader reader = sstableIterator.next();
+                currentScanner = reader.getDirectScanner(range);
+                return computeNext();
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public void close() throws IOException
+        {
+            if (currentScanner != null)
+                currentScanner.close();
+        }
+
+        public long getLengthInBytes()
+        {
+            return totalLength;
+        }
+
+        public long getCurrentPosition()
+        {
+            return positionOffset + (currentScanner == null ? 0L : currentScanner.getCurrentPosition());
+        }
+
+        public String getBackingFiles()
+        {
+            return Joiner.on(", ").join(sstables);
+        }
+    }
+
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 592f0e9..69ab492 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -303,7 +303,6 @@ public class LeveledManifest
 
     public int getLevelSize(int i)
     {
-
         return generations.length > i ? generations[i].size() : 0;
     }
 
@@ -322,7 +321,7 @@ public class LeveledManifest
         }
     }
 
-    private int levelOf(SSTableReader sstable)
+    int levelOf(SSTableReader sstable)
     {
         Integer level = sstableGenerations.get(sstable);
         if (level == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index 03a29cd..79b3396 100644
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -40,7 +40,6 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableScanner;
 import org.apache.cassandra.utils.*;
 
 /**
@@ -61,17 +60,12 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
 
     private final int maxInMemorySize;
 
-    public ParallelCompactionIterable(OperationType type, Iterable<SSTableReader> sstables, CompactionController controller) throws IOException
+    public ParallelCompactionIterable(OperationType type, List<ICompactionScanner> scanners, CompactionController controller) throws IOException
     {
-        this(type, getScanners(sstables), controller, DatabaseDescriptor.getInMemoryCompactionLimit() / Iterables.size(sstables));
+        this(type, scanners, controller, DatabaseDescriptor.getInMemoryCompactionLimit() / scanners.size());
     }
 
-    public ParallelCompactionIterable(OperationType type, Iterable<SSTableReader> sstables, CompactionController controller, int maxInMemorySize) throws IOException
-    {
-        this(type, getScanners(sstables), controller, maxInMemorySize);
-    }
-
-    protected ParallelCompactionIterable(OperationType type, List<SSTableScanner> scanners, CompactionController controller, int maxInMemorySize)
+    public ParallelCompactionIterable(OperationType type, List<ICompactionScanner> scanners, CompactionController controller, int maxInMemorySize)
     {
         super(controller, type, scanners);
         this.maxInMemorySize = maxInMemorySize;
@@ -80,7 +74,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
     public CloseableIterator<AbstractCompactedRow> iterator()
     {
         List<CloseableIterator<RowContainer>> sources = new ArrayList<CloseableIterator<RowContainer>>(scanners.size());
-        for (SSTableScanner scanner : scanners)
+        for (ICompactionScanner scanner : scanners)
             sources.add(new Deserializer(scanner, maxInMemorySize));
         return new Unwrapper(MergeIterator.get(sources, RowContainer.comparator, new Reducer()), controller);
     }
@@ -164,8 +158,8 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
             if ((row++ % 1000) == 0)
             {
                 long n = 0;
-                for (SSTableScanner scanner : scanners)
-                    n += scanner.getFilePointer();
+                for (ICompactionScanner scanner : scanners)
+                    n += scanner.getCurrentPosition();
                 bytesRead = n;
                 controller.mayThrottle(bytesRead);
             }
@@ -283,9 +277,9 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
         private final LinkedBlockingQueue<RowContainer> queue = new LinkedBlockingQueue<RowContainer>(1);
         private static final RowContainer finished = new RowContainer((Row) null);
         private Condition condition;
-        private final SSTableScanner scanner;
+        private final ICompactionScanner scanner;
 
-        public Deserializer(SSTableScanner ssts, final int maxInMemorySize)
+        public Deserializer(ICompactionScanner ssts, final int maxInMemorySize)
         {
             this.scanner = ssts;
             Runnable runnable = new WrappedRunnable()
@@ -318,7 +312,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
                     }
                 }
             };
-            new Thread(runnable, "Deserialize " + scanner.sstable).start();
+            new Thread(runnable, "Deserialize " + scanner.getBackingFiles()).start();
         }
 
         protected RowContainer computeNext()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index c332ae6..2482188 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -847,6 +847,8 @@ public class SSTableReader extends SSTable
     */
     public SSTableScanner getDirectScanner(Range<Token> range)
     {
+        if (range == null)
+            return getDirectScanner();
         return new SSTableBoundedScanner(this, true, range);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index 5e4f269..26ed908 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -27,15 +27,15 @@ import java.util.Iterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.compaction.ICompactionScanner;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.db.columniterator.IColumnIterator;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.CloseableIterator;
 
-public class SSTableScanner implements CloseableIterator<IColumnIterator>
+public class SSTableScanner implements ICompactionScanner
 {
     private static Logger logger = LoggerFactory.getLogger(SSTableScanner.class);
 
@@ -107,7 +107,7 @@ public class SSTableScanner implements CloseableIterator<IColumnIterator>
         }
     }
 
-    public long getFileLength()
+    public long getLengthInBytes()
     {
         try
         {
@@ -119,11 +119,16 @@ public class SSTableScanner implements CloseableIterator<IColumnIterator>
         }
     }
 
-    public long getFilePointer()
+    public long getCurrentPosition()
     {
         return file.getFilePointer();
     }
 
+    public String getBackingFiles()
+    {
+        return sstable.toString();
+    }
+
     public boolean hasNext()
     {
         if (iterator == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/service/AntiEntropyService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AntiEntropyService.java b/src/java/org/apache/cassandra/service/AntiEntropyService.java
index 0c11947..d76d26f 100644
--- a/src/java/org/apache/cassandra/service/AntiEntropyService.java
+++ b/src/java/org/apache/cassandra/service/AntiEntropyService.java
@@ -266,7 +266,7 @@ public class AntiEntropyService
 
         public final static MerkleTree.RowHash EMPTY_ROW = new MerkleTree.RowHash(null, new byte[0]);
 
-        Validator(TreeRequest request)
+        public Validator(TreeRequest request)
         {
             this(request,
                  // TODO: memory usage (maxsize) should either be tunable per
@@ -546,7 +546,7 @@ public class AntiEntropyService
     /**
      * A tuple of table and cf.
      */
-    static class CFPair extends Pair<String,String>
+    public static class CFPair extends Pair<String,String>
     {
         public CFPair(String table, String cf)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index 315a75a..3578493 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -33,3 +33,4 @@ encryption_options:
     truststore_password: cassandra
 incremental_backups: true
 flush_largest_memtables_at: 1.0
+compaction_throughput_mb_per_sec: 0

http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index 8087ea6..1d3bd83 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -28,6 +28,7 @@ import com.google.common.base.Charsets;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.gms.Gossiper;
@@ -140,6 +141,10 @@ public class SchemaLoader
             null,
             null));
 
+        // Make it easy to test leveled compaction
+        Map<String, String> leveledOptions = new HashMap<String, String>();
+        leveledOptions.put("sstable_size_in_mb", "1");
+
         // Keyspace 1
         schema.add(KSMetaData.testMetadata(ks1,
                                            simple,
@@ -198,7 +203,9 @@ public class SchemaLoader
                                                           "StandardDynamicComposite",
                                                           st,
                                                           dynamicComposite,
-                                                          null)));
+                                                          null),
+                                           standardCFMD(ks1, "StandardLeveled").compactionStrategyClass(LeveledCompactionStrategy.class)
+                                                                               .compactionStrategyOptions(leveledOptions)));
 
         // Keyspace 2
         schema.add(KSMetaData.testMetadata(ks2,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
new file mode 100644
index 0000000..56f12de
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.Future;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.AntiEntropyService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class LeveledCompactionStrategyTest extends SchemaLoader
+{
+    /*
+     * This excercise in particular the code of #4142
+     */
+    @Test
+    public void testValidationMultipleSSTablePerLevel() throws Exception
+    {
+        String ksname = "Keyspace1";
+        String cfname = "StandardLeveled";
+        Table table = Table.open(ksname);
+        ColumnFamilyStore store = table.getColumnFamilyStore(cfname);
+
+        ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB value, make it easy to have multiple files
+
+        // Enough data to have a level 1 and 2
+        int rows = 20;
+        int columns = 10;
+
+        // Adds enough data to trigger multiple sstable per level
+        for (int r = 0; r < rows; r++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(r));
+            RowMutation rm = new RowMutation(ksname, key.key);
+            for (int c = 0; c < columns; c++)
+            {
+                rm.add(new QueryPath(cfname, null, ByteBufferUtil.bytes("column" + c)), value, 0);
+            }
+            rm.apply();
+            store.forceFlush();
+        }
+
+        LeveledCompactionStrategy strat = (LeveledCompactionStrategy)store.getCompactionStrategy();
+
+        while (strat.getLevelSize(0) > 0)
+        {
+            store.forceMajorCompaction();
+            Thread.sleep(200);
+        }
+        // Checking we're not completely bad at math
+        assert strat.getLevelSize(1) > 0;
+        assert strat.getLevelSize(2) > 0;
+
+        AntiEntropyService.CFPair p = new AntiEntropyService.CFPair(ksname, cfname);
+        Range<Token> range = new Range<Token>(Util.token(""), Util.token(""));
+        AntiEntropyService.TreeRequest req = new AntiEntropyService.TreeRequest("1", FBUtilities.getLocalAddress(), range, p);
+        AntiEntropyService.Validator validator = new AntiEntropyService.Validator(req);
+        CompactionManager.instance.submitValidation(store, validator).get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
index 22ffe97..0f48b3d 100644
--- a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
+++ b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
@@ -57,23 +57,24 @@ public class LazilyCompactedRowTest extends SchemaLoader
 {
     private static void assertBytes(ColumnFamilyStore cfs, int gcBefore) throws IOException
     {
+        AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
         Collection<SSTableReader> sstables = cfs.getSSTables();
 
         // compare eager and lazy compactions
         AbstractCompactionIterable eager = new CompactionIterable(OperationType.UNKNOWN,
-                                                                  sstables,
+                                                                  strategy.getScanners(sstables),
                                                                   new PreCompactingController(cfs, sstables, gcBefore, false));
         AbstractCompactionIterable lazy = new CompactionIterable(OperationType.UNKNOWN,
-                                                                 sstables,
+                                                                 strategy.getScanners(sstables),
                                                                  new LazilyCompactingController(cfs, sstables, gcBefore, false));
         assertBytes(cfs, sstables, eager, lazy);
 
         // compare eager and parallel-lazy compactions
         eager = new CompactionIterable(OperationType.UNKNOWN,
-                                       sstables,
+                                       strategy.getScanners(sstables),
                                        new PreCompactingController(cfs, sstables, gcBefore, false));
         AbstractCompactionIterable parallel = new ParallelCompactionIterable(OperationType.UNKNOWN,
-                                                                             sstables,
+                                                                             strategy.getScanners(sstables),
                                                                              new CompactionController(cfs, sstables, gcBefore, false),
                                                                              0);
         assertBytes(cfs, sstables, eager, parallel);
@@ -155,9 +156,10 @@ public class LazilyCompactedRowTest extends SchemaLoader
 
     private void assertDigest(ColumnFamilyStore cfs, int gcBefore) throws IOException, NoSuchAlgorithmException
     {
+        AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
         Collection<SSTableReader> sstables = cfs.getSSTables();
-        AbstractCompactionIterable ci1 = new CompactionIterable(OperationType.UNKNOWN, sstables, new PreCompactingController(cfs, sstables, gcBefore, false));
-        AbstractCompactionIterable ci2 = new CompactionIterable(OperationType.UNKNOWN, sstables, new LazilyCompactingController(cfs, sstables, gcBefore, false));
+        AbstractCompactionIterable ci1 = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), new PreCompactingController(cfs, sstables, gcBefore, false));
+        AbstractCompactionIterable ci2 = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), new LazilyCompactingController(cfs, sstables, gcBefore, false));
         CloseableIterator<AbstractCompactedRow> iter1 = ci1.iterator();
         CloseableIterator<AbstractCompactedRow> iter2 = ci2.iterator();