You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/05/02 19:48:48 UTC
[2/4] git commit: Open one sstableScanner per level for leveled
compaction patch by slebresne and jbellis for CASSANDRA-4142
Open one sstableScanner per level for leveled compaction
patch by slebresne and jbellis for CASSANDRA-4142
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/46e422a9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/46e422a9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/46e422a9
Branch: refs/heads/cassandra-1.1
Commit: 46e422a9417b5b513ceae4c9652ba413e2ede474
Parents: 1686a36
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Apr 27 16:19:24 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed May 2 12:48:23 2012 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/AbstractCompactionIterable.java | 17 +--
.../db/compaction/AbstractCompactionStrategy.java | 23 +++-
.../db/compaction/CompactionIterable.java | 20 +---
.../cassandra/db/compaction/CompactionManager.java | 16 +--
.../cassandra/db/compaction/CompactionTask.java | 7 +-
.../db/compaction/ICompactionScanner.java | 34 +++++
.../db/compaction/LeveledCompactionStrategy.java | 103 ++++++++++++++-
.../cassandra/db/compaction/LeveledManifest.java | 3 +-
.../db/compaction/ParallelCompactionIterable.java | 24 ++--
.../apache/cassandra/io/sstable/SSTableReader.java | 2 +
.../cassandra/io/sstable/SSTableScanner.java | 13 ++-
.../cassandra/service/AntiEntropyService.java | 4 +-
test/conf/cassandra.yaml | 1 +
test/unit/org/apache/cassandra/SchemaLoader.java | 9 +-
.../compaction/LeveledCompactionStrategyTest.java | 87 ++++++++++++
.../cassandra/io/LazilyCompactedRowTest.java | 14 +-
17 files changed, 300 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 62234b2..83c171b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.1.1-dev
+ * Open 1 sstableScanner per level for leveled compaction (CASSANDRA-4142)
* Optimize reads when row deletion timestamps allow us to restrict
the set of sstables we check (CASSANDRA-4116)
* incremental repair by token range (CASSANDRA-3912)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
index 95e6590..8976f4e 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
@@ -29,7 +29,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableScanner;
import org.apache.cassandra.utils.CloseableIterator;
public abstract class AbstractCompactionIterable extends CompactionInfo.Holder implements Iterable<AbstractCompactedRow>
@@ -40,9 +39,9 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i
protected final CompactionController controller;
protected final long totalBytes;
protected volatile long bytesRead;
- protected final List<SSTableScanner> scanners;
+ protected final List<ICompactionScanner> scanners;
- public AbstractCompactionIterable(CompactionController controller, OperationType type, List<SSTableScanner> scanners)
+ public AbstractCompactionIterable(CompactionController controller, OperationType type, List<ICompactionScanner> scanners)
{
this.controller = controller;
this.type = type;
@@ -50,19 +49,11 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i
this.bytesRead = 0;
long bytes = 0;
- for (SSTableScanner scanner : scanners)
- bytes += scanner.getFileLength();
+ for (ICompactionScanner scanner : scanners)
+ bytes += scanner.getLengthInBytes();
this.totalBytes = bytes;
}
- protected static List<SSTableScanner> getScanners(Iterable<SSTableReader> sstables) throws IOException
- {
- ArrayList<SSTableScanner> scanners = new ArrayList<SSTableScanner>();
- for (SSTableReader sstable : sstables)
- scanners.add(sstable.getDirectScanner());
- return scanners;
- }
-
public CompactionInfo getCompactionInfo()
{
return new CompactionInfo(this.hashCode(),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 2e70c46..3f51b88 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -18,15 +18,17 @@
package org.apache.cassandra.db.compaction;
+import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.service.StorageService;
-
/**
* Pluggable compaction strategy determines how SSTables get merged.
*
@@ -126,4 +128,23 @@ public abstract class AbstractCompactionStrategy
return filteredCandidates;
}
+
+ /**
+ * Returns a list of KeyScanners given sstables and a range on which to scan.
+ * The default implementation simply grab one SSTableScanner per-sstable, but overriding this method
+ * allow for a more memory efficient solution if we know the sstable don't overlap (see
+ * LeveledCompactionStrategy for instance).
+ */
+ public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables, Range<Token> range) throws IOException
+ {
+ ArrayList<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>();
+ for (SSTableReader sstable : sstables)
+ scanners.add(sstable.getDirectScanner(range));
+ return scanners;
+ }
+
+ public List<ICompactionScanner> getScanners(Collection<SSTableReader> toCompact) throws IOException
+ {
+ return getScanners(toCompact, null);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
index 8a4aa0e..0ead533 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.columniterator.IColumnIterator;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableScanner;
import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.MergeIterator;
@@ -50,25 +49,12 @@ public class CompactionIterable extends AbstractCompactionIterable
}
};
- public CompactionIterable(OperationType type, Iterable<SSTableReader> sstables, CompactionController controller) throws IOException
- {
- this(type, getScanners(sstables), controller);
- }
-
- protected CompactionIterable(OperationType type, List<SSTableScanner> scanners, CompactionController controller)
+ public CompactionIterable(OperationType type, List<ICompactionScanner> scanners, CompactionController controller)
{
super(controller, type, scanners);
row = 0;
}
- protected static List<SSTableScanner> getScanners(Iterable<SSTableReader> sstables) throws IOException
- {
- ArrayList<SSTableScanner> scanners = new ArrayList<SSTableScanner>();
- for (SSTableReader sstable : sstables)
- scanners.add(sstable.getDirectScanner());
- return scanners;
- }
-
public CloseableIterator<AbstractCompactedRow> iterator()
{
return MergeIterator.get(scanners, comparator, new Reducer());
@@ -116,8 +102,8 @@ public class CompactionIterable extends AbstractCompactionIterable
if ((row++ % 1000) == 0)
{
long n = 0;
- for (SSTableScanner scanner : scanners)
- n += scanner.getFilePointer();
+ for (ICompactionScanner scanner : scanners)
+ n += scanner.getCurrentPosition();
bytesRead = n;
controller.mayThrottle(bytesRead);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 0ecac5c..fd14593 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -760,7 +760,7 @@ public class CompactionManager implements CompactionManagerMBean
}
}
if ((rowsRead++ % 1000) == 0)
- controller.mayThrottle(scanner.getFilePointer());
+ controller.mayThrottle(scanner.getCurrentPosition());
}
if (writer != null)
newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
@@ -986,17 +986,9 @@ public class CompactionManager implements CompactionManagerMBean
public ValidationCompactionIterable(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, Range<Token> range) throws IOException
{
super(OperationType.VALIDATION,
- getScanners(sstables, range),
+ cfs.getCompactionStrategy().getScanners(sstables, range),
new CompactionController(cfs, sstables, getDefaultGcBefore(cfs), true));
}
-
- protected static List<SSTableScanner> getScanners(Iterable<SSTableReader> sstables, Range<Token> range) throws IOException
- {
- ArrayList<SSTableScanner> scanners = new ArrayList<SSTableScanner>();
- for (SSTableReader sstable : sstables)
- scanners.add(sstable.getDirectScanner(range));
- return scanners;
- }
}
public int getActiveCompactions()
@@ -1196,8 +1188,8 @@ public class CompactionManager implements CompactionManagerMBean
sstable.descriptor.ksname,
sstable.descriptor.cfname,
OperationType.CLEANUP,
- scanner.getFilePointer(),
- scanner.getFileLength());
+ scanner.getCurrentPosition(),
+ scanner.getLengthInBytes());
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index e93725c..b66f20b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -116,15 +116,16 @@ public class CompactionTask extends AbstractCompactionTask
long startTime = System.currentTimeMillis();
long totalkeysWritten = 0;
+ AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
long estimatedTotalKeys = Math.max(DatabaseDescriptor.getIndexInterval(), SSTableReader.getApproximateKeyCount(toCompact));
- long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(toCompact) / cfs.getCompactionStrategy().getMaxSSTableSize());
+ long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(toCompact) / strategy.getMaxSSTableSize());
long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : " + keysPerSSTable);
AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
- ? new ParallelCompactionIterable(compactionType, toCompact, controller)
- : new CompactionIterable(compactionType, toCompact, controller);
+ ? new ParallelCompactionIterable(compactionType, strategy.getScanners(toCompact), controller)
+ : new CompactionIterable(compactionType, strategy.getScanners(toCompact), controller);
CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull());
Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/ICompactionScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ICompactionScanner.java b/src/java/org/apache/cassandra/db/compaction/ICompactionScanner.java
new file mode 100644
index 0000000..0b1a263
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/ICompactionScanner.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.utils.CloseableIterator;
+
+/**
+ * An ICompactionScanner is an abstraction allowing multiple SSTableScanners to be
+ * chained together under the hood. See LeveledCompactionStrategy.getScanners.
+ */
+public interface ICompactionScanner extends CloseableIterator<IColumnIterator>
+{
+ public long getLengthInBytes();
+ public long getCurrentPosition();
+ public String getBackingFiles();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index d224e5e..1bc40fd 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -21,17 +21,22 @@ package org.apache.cassandra.db.compaction;
*/
+import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
+import com.google.common.base.Joiner;
+import com.google.common.collect.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableScanner;
import org.apache.cassandra.notifications.INotification;
import org.apache.cassandra.notifications.INotificationConsumer;
import org.apache.cassandra.notifications.SSTableAddedNotification;
@@ -162,6 +167,100 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
return Sets.difference(L0, sstablesToIgnore).size() + manifest.getLevelCount() > 20;
}
+ public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables, Range<Token> range) throws IOException
+ {
+ Multimap<Integer, SSTableReader> byLevel = ArrayListMultimap.create();
+ for (SSTableReader sstable : sstables)
+ byLevel.get(manifest.levelOf(sstable)).add(sstable);
+
+ List<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>(sstables.size());
+ for (Integer level : ImmutableSortedSet.copyOf(byLevel.keySet()))
+ scanners.add(new LeveledScanner(new ArrayList<SSTableReader>(byLevel.get(level)), range));
+
+ return scanners;
+ }
+
+ // Lazily creates SSTableBoundedScanner for sstable that are assumed to be from the
+ // same level (e.g. non overlapping) - see #4142
+ private static class LeveledScanner extends AbstractIterator<IColumnIterator> implements ICompactionScanner
+ {
+ private final Range<Token> range;
+ private final List<SSTableReader> sstables;
+ private final Iterator<SSTableReader> sstableIterator;
+ private final long totalLength;
+
+ private SSTableScanner currentScanner;
+ private long positionOffset;
+
+ public LeveledScanner(List<SSTableReader> sstables, Range<Token> range)
+ {
+ this.range = range;
+ this.sstables = sstables;
+
+ // Sorting a list we got in argument is bad but it's all private to this class so let's not bother
+ Collections.sort(sstables, SSTable.sstableComparator);
+ this.sstableIterator = sstables.iterator();
+
+ long length = 0;
+ for (SSTableReader sstable : sstables)
+ length += sstable.uncompressedLength();
+ totalLength = length;
+ }
+
+ protected IColumnIterator computeNext()
+ {
+ try
+ {
+ if (currentScanner != null)
+ {
+ if (currentScanner.hasNext())
+ {
+ return currentScanner.next();
+ }
+ else
+ {
+ positionOffset += currentScanner.getLengthInBytes();
+ currentScanner.close();
+ currentScanner = null;
+ return computeNext();
+ }
+ }
+
+ if (!sstableIterator.hasNext())
+ return endOfData();
+
+ SSTableReader reader = sstableIterator.next();
+ currentScanner = reader.getDirectScanner(range);
+ return computeNext();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void close() throws IOException
+ {
+ if (currentScanner != null)
+ currentScanner.close();
+ }
+
+ public long getLengthInBytes()
+ {
+ return totalLength;
+ }
+
+ public long getCurrentPosition()
+ {
+ return positionOffset + (currentScanner == null ? 0L : currentScanner.getCurrentPosition());
+ }
+
+ public String getBackingFiles()
+ {
+ return Joiner.on(", ").join(sstables);
+ }
+ }
+
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 592f0e9..69ab492 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -303,7 +303,6 @@ public class LeveledManifest
public int getLevelSize(int i)
{
-
return generations.length > i ? generations[i].size() : 0;
}
@@ -322,7 +321,7 @@ public class LeveledManifest
}
}
- private int levelOf(SSTableReader sstable)
+ int levelOf(SSTableReader sstable)
{
Integer level = sstableGenerations.get(sstable);
if (level == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index 03a29cd..79b3396 100644
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -40,7 +40,6 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableScanner;
import org.apache.cassandra.utils.*;
/**
@@ -61,17 +60,12 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
private final int maxInMemorySize;
- public ParallelCompactionIterable(OperationType type, Iterable<SSTableReader> sstables, CompactionController controller) throws IOException
+ public ParallelCompactionIterable(OperationType type, List<ICompactionScanner> scanners, CompactionController controller) throws IOException
{
- this(type, getScanners(sstables), controller, DatabaseDescriptor.getInMemoryCompactionLimit() / Iterables.size(sstables));
+ this(type, scanners, controller, DatabaseDescriptor.getInMemoryCompactionLimit() / scanners.size());
}
- public ParallelCompactionIterable(OperationType type, Iterable<SSTableReader> sstables, CompactionController controller, int maxInMemorySize) throws IOException
- {
- this(type, getScanners(sstables), controller, maxInMemorySize);
- }
-
- protected ParallelCompactionIterable(OperationType type, List<SSTableScanner> scanners, CompactionController controller, int maxInMemorySize)
+ public ParallelCompactionIterable(OperationType type, List<ICompactionScanner> scanners, CompactionController controller, int maxInMemorySize)
{
super(controller, type, scanners);
this.maxInMemorySize = maxInMemorySize;
@@ -80,7 +74,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
public CloseableIterator<AbstractCompactedRow> iterator()
{
List<CloseableIterator<RowContainer>> sources = new ArrayList<CloseableIterator<RowContainer>>(scanners.size());
- for (SSTableScanner scanner : scanners)
+ for (ICompactionScanner scanner : scanners)
sources.add(new Deserializer(scanner, maxInMemorySize));
return new Unwrapper(MergeIterator.get(sources, RowContainer.comparator, new Reducer()), controller);
}
@@ -164,8 +158,8 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
if ((row++ % 1000) == 0)
{
long n = 0;
- for (SSTableScanner scanner : scanners)
- n += scanner.getFilePointer();
+ for (ICompactionScanner scanner : scanners)
+ n += scanner.getCurrentPosition();
bytesRead = n;
controller.mayThrottle(bytesRead);
}
@@ -283,9 +277,9 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
private final LinkedBlockingQueue<RowContainer> queue = new LinkedBlockingQueue<RowContainer>(1);
private static final RowContainer finished = new RowContainer((Row) null);
private Condition condition;
- private final SSTableScanner scanner;
+ private final ICompactionScanner scanner;
- public Deserializer(SSTableScanner ssts, final int maxInMemorySize)
+ public Deserializer(ICompactionScanner ssts, final int maxInMemorySize)
{
this.scanner = ssts;
Runnable runnable = new WrappedRunnable()
@@ -318,7 +312,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
}
}
};
- new Thread(runnable, "Deserialize " + scanner.sstable).start();
+ new Thread(runnable, "Deserialize " + scanner.getBackingFiles()).start();
}
protected RowContainer computeNext()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index c332ae6..2482188 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -847,6 +847,8 @@ public class SSTableReader extends SSTable
*/
public SSTableScanner getDirectScanner(Range<Token> range)
{
+ if (range == null)
+ return getDirectScanner();
return new SSTableBoundedScanner(this, true, range);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index 5e4f269..26ed908 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -27,15 +27,15 @@ import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.db.compaction.ICompactionScanner;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.db.columniterator.IColumnIterator;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.CloseableIterator;
-public class SSTableScanner implements CloseableIterator<IColumnIterator>
+public class SSTableScanner implements ICompactionScanner
{
private static Logger logger = LoggerFactory.getLogger(SSTableScanner.class);
@@ -107,7 +107,7 @@ public class SSTableScanner implements CloseableIterator<IColumnIterator>
}
}
- public long getFileLength()
+ public long getLengthInBytes()
{
try
{
@@ -119,11 +119,16 @@ public class SSTableScanner implements CloseableIterator<IColumnIterator>
}
}
- public long getFilePointer()
+ public long getCurrentPosition()
{
return file.getFilePointer();
}
+ public String getBackingFiles()
+ {
+ return sstable.toString();
+ }
+
public boolean hasNext()
{
if (iterator == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/src/java/org/apache/cassandra/service/AntiEntropyService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AntiEntropyService.java b/src/java/org/apache/cassandra/service/AntiEntropyService.java
index 0c11947..d76d26f 100644
--- a/src/java/org/apache/cassandra/service/AntiEntropyService.java
+++ b/src/java/org/apache/cassandra/service/AntiEntropyService.java
@@ -266,7 +266,7 @@ public class AntiEntropyService
public final static MerkleTree.RowHash EMPTY_ROW = new MerkleTree.RowHash(null, new byte[0]);
- Validator(TreeRequest request)
+ public Validator(TreeRequest request)
{
this(request,
// TODO: memory usage (maxsize) should either be tunable per
@@ -546,7 +546,7 @@ public class AntiEntropyService
/**
* A tuple of table and cf.
*/
- static class CFPair extends Pair<String,String>
+ public static class CFPair extends Pair<String,String>
{
public CFPair(String table, String cf)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index 315a75a..3578493 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -33,3 +33,4 @@ encryption_options:
truststore_password: cassandra
incremental_backups: true
flush_largest_memtables_at: 1.0
+compaction_throughput_mb_per_sec: 0
http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index 8087ea6..1d3bd83 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -28,6 +28,7 @@ import com.google.common.base.Charsets;
import org.apache.cassandra.config.*;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.gms.Gossiper;
@@ -140,6 +141,10 @@ public class SchemaLoader
null,
null));
+ // Make it easy to test leveled compaction
+ Map<String, String> leveledOptions = new HashMap<String, String>();
+ leveledOptions.put("sstable_size_in_mb", "1");
+
// Keyspace 1
schema.add(KSMetaData.testMetadata(ks1,
simple,
@@ -198,7 +203,9 @@ public class SchemaLoader
"StandardDynamicComposite",
st,
dynamicComposite,
- null)));
+ null),
+ standardCFMD(ks1, "StandardLeveled").compactionStrategyClass(LeveledCompactionStrategy.class)
+ .compactionStrategyOptions(leveledOptions)));
// Keyspace 2
schema.add(KSMetaData.testMetadata(ks2,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
new file mode 100644
index 0000000..56f12de
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.Future;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.AntiEntropyService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class LeveledCompactionStrategyTest extends SchemaLoader
+{
+ /*
+ * This excercise in particular the code of #4142
+ */
+ @Test
+ public void testValidationMultipleSSTablePerLevel() throws Exception
+ {
+ String ksname = "Keyspace1";
+ String cfname = "StandardLeveled";
+ Table table = Table.open(ksname);
+ ColumnFamilyStore store = table.getColumnFamilyStore(cfname);
+
+ ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB value, make it easy to have multiple files
+
+ // Enough data to have a level 1 and 2
+ int rows = 20;
+ int columns = 10;
+
+ // Adds enough data to trigger multiple sstable per level
+ for (int r = 0; r < rows; r++)
+ {
+ DecoratedKey key = Util.dk(String.valueOf(r));
+ RowMutation rm = new RowMutation(ksname, key.key);
+ for (int c = 0; c < columns; c++)
+ {
+ rm.add(new QueryPath(cfname, null, ByteBufferUtil.bytes("column" + c)), value, 0);
+ }
+ rm.apply();
+ store.forceFlush();
+ }
+
+ LeveledCompactionStrategy strat = (LeveledCompactionStrategy)store.getCompactionStrategy();
+
+ while (strat.getLevelSize(0) > 0)
+ {
+ store.forceMajorCompaction();
+ Thread.sleep(200);
+ }
+ // Checking we're not completely bad at math
+ assert strat.getLevelSize(1) > 0;
+ assert strat.getLevelSize(2) > 0;
+
+ AntiEntropyService.CFPair p = new AntiEntropyService.CFPair(ksname, cfname);
+ Range<Token> range = new Range<Token>(Util.token(""), Util.token(""));
+ AntiEntropyService.TreeRequest req = new AntiEntropyService.TreeRequest("1", FBUtilities.getLocalAddress(), range, p);
+ AntiEntropyService.Validator validator = new AntiEntropyService.Validator(req);
+ CompactionManager.instance.submitValidation(store, validator).get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/46e422a9/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
index 22ffe97..0f48b3d 100644
--- a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
+++ b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
@@ -57,23 +57,24 @@ public class LazilyCompactedRowTest extends SchemaLoader
{
private static void assertBytes(ColumnFamilyStore cfs, int gcBefore) throws IOException
{
+ AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
Collection<SSTableReader> sstables = cfs.getSSTables();
// compare eager and lazy compactions
AbstractCompactionIterable eager = new CompactionIterable(OperationType.UNKNOWN,
- sstables,
+ strategy.getScanners(sstables),
new PreCompactingController(cfs, sstables, gcBefore, false));
AbstractCompactionIterable lazy = new CompactionIterable(OperationType.UNKNOWN,
- sstables,
+ strategy.getScanners(sstables),
new LazilyCompactingController(cfs, sstables, gcBefore, false));
assertBytes(cfs, sstables, eager, lazy);
// compare eager and parallel-lazy compactions
eager = new CompactionIterable(OperationType.UNKNOWN,
- sstables,
+ strategy.getScanners(sstables),
new PreCompactingController(cfs, sstables, gcBefore, false));
AbstractCompactionIterable parallel = new ParallelCompactionIterable(OperationType.UNKNOWN,
- sstables,
+ strategy.getScanners(sstables),
new CompactionController(cfs, sstables, gcBefore, false),
0);
assertBytes(cfs, sstables, eager, parallel);
@@ -155,9 +156,10 @@ public class LazilyCompactedRowTest extends SchemaLoader
private void assertDigest(ColumnFamilyStore cfs, int gcBefore) throws IOException, NoSuchAlgorithmException
{
+ AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
Collection<SSTableReader> sstables = cfs.getSSTables();
- AbstractCompactionIterable ci1 = new CompactionIterable(OperationType.UNKNOWN, sstables, new PreCompactingController(cfs, sstables, gcBefore, false));
- AbstractCompactionIterable ci2 = new CompactionIterable(OperationType.UNKNOWN, sstables, new LazilyCompactingController(cfs, sstables, gcBefore, false));
+ AbstractCompactionIterable ci1 = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), new PreCompactingController(cfs, sstables, gcBefore, false));
+ AbstractCompactionIterable ci2 = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), new LazilyCompactingController(cfs, sstables, gcBefore, false));
CloseableIterator<AbstractCompactedRow> iter1 = ci1.iterator();
CloseableIterator<AbstractCompactedRow> iter2 = ci2.iterator();