You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/09/07 18:31:53 UTC
svn commit: r1166255 - in /cassandra/trunk: ./ conf/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/db/columniterator/
src/java/org/apache/cassandra/db/compaction/
src/java/org/apache/cassandra/io/...
Author: jbellis
Date: Wed Sep 7 16:31:52 2011
New Revision: 1166255
URL: http://svn.apache.org/viewvc?rev=1166255&view=rev
Log:
parallel compaction
patch by jbellis; reviewed by slebresne for CASSANDRA-2901
Added:
cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/ICountableColumnIterator.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/NEWS.txt
cassandra/trunk/conf/cassandra.yaml
cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/db/EchoedRow.java
cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Sep 7 16:31:52 2011
@@ -58,6 +58,9 @@
* expose the ability to repair the first range (as returned by the
partitioner) of a node (CASSANDRA-2606)
* Streams Compression (CASSANDRA-3015)
+ * add ability to use multiple threads during a single compaction
+ (CASSANDRA-2901)
+
0.8.5
* fix NPE when encryption_options is unspecified (CASSANDRA-3007)
Modified: cassandra/trunk/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Wed Sep 7 16:31:52 2011
@@ -23,6 +23,8 @@ Features
Columnfamily compaction_strategy=LeveledCompactionStrategy option.
Leveled compaction means you only need to keep a few MB of space free for
compaction instead of (in the worst case) 50%.
+ - Ability to use multiple threads during a single compaction. See
+ multithreaded_compaction in cassandra.yaml for more details.
- Windows Service ("cassandra.bat install" to enable)
- Hinted Handoff has two major improvements:
- Hint replay is much more efficient thanks to a change in the data model
Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Wed Sep 7 16:31:52 2011
@@ -268,17 +268,27 @@ column_index_size_in_kb: 64
in_memory_compaction_limit_in_mb: 64
# Number of simultaneous compactions to allow, NOT including
-# validation "compactions" for anti-entropy repair. This defaults to
-# the number of cores. This can help preserve read performance in a
-# mixed read/write workload, by mitigating the tendency of small
-# sstables to accumulate during a single long running compactions. The
-# default is usually fine and if you experience problems with
-# compaction running too slowly or too fast, you should look at
+# validation "compactions" for anti-entropy repair. Simultaneous
+# compactions can help preserve read performance in a mixed read/write
+# workload, by mitigating the tendency of small sstables to accumulate
+# during a single long running compactions. The default is usually
+# fine and if you experience problems with compaction running too
+# slowly or too fast, you should look at
# compaction_throughput_mb_per_sec first.
#
-# Uncomment to make compaction mono-threaded.
+# This setting has no effect on LeveledCompactionStrategy.
+#
+# concurrent_compactors defaults to the number of cores.
+# Uncomment to make compaction mono-threaded, the pre-0.8 default.
#concurrent_compactors: 1
+# Multi-threaded compaction. When enabled, each compaction will use
+# up to one thread per core, plus one thread per sstable being merged.
+# This is usually only useful for SSD-based hardware: otherwise,
+# your concern is usually to get compaction to do LESS i/o (see:
+# compaction_throughput_mb_per_sec), not more.
+multithreaded_compaction: false
+
# Throttles compaction to the given total throughput across the entire
# system. The faster you insert data, the faster you need to compact in
# order to keep the sstable count down, but in general, setting this to
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Wed Sep 7 16:31:52 2011
@@ -83,9 +83,10 @@ public class Config
public Integer in_memory_compaction_limit_in_mb = 256;
public Integer concurrent_compactors = Runtime.getRuntime().availableProcessors();
public Integer compaction_throughput_mb_per_sec = 16;
+ public Boolean multithreaded_compaction = false;
public Integer stream_throughput_outbound_megabits_per_sec;
-
+
public String[] data_file_directories;
public String saved_caches_directory;
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Wed Sep 7 16:31:52 2011
@@ -674,6 +674,11 @@ public class DatabaseDescriptor
return conf.concurrent_compactors;
}
+ public static boolean isMultithreadedCompaction()
+ {
+ return conf.multithreaded_compaction;
+ }
+
public static int getCompactionThroughputMbPerSec()
{
return conf.compaction_throughput_mb_per_sec;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/EchoedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/EchoedRow.java?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/EchoedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/EchoedRow.java Wed Sep 7 16:31:52 2011
@@ -68,7 +68,7 @@ public class EchoedRow extends AbstractC
public int columnCount()
{
- return row.columnCount;
+ return row.getColumnCount();
}
public long maxTimestamp()
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java Wed Sep 7 16:31:52 2011
@@ -32,7 +32,8 @@ import org.apache.cassandra.utils.Closea
public interface IColumnIterator extends CloseableIterator<IColumn>
{
/**
- * @return An empty CF holding metadata for the row being iterated.
+ * @return A ColumnFamily holding metadata for the row being iterated.
+ * Do not modify this CF. Whether it is empty or not is implementation-dependent.
*/
public abstract ColumnFamily getColumnFamily();
Added: cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/ICountableColumnIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/ICountableColumnIterator.java?rev=1166255&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/ICountableColumnIterator.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/ICountableColumnIterator.java Wed Sep 7 16:31:52 2011
@@ -0,0 +1,8 @@
+package org.apache.cassandra.db.columniterator;
+
+public interface ICountableColumnIterator extends IColumnIterator
+{
+ public int getColumnCount();
+
+ public void reset();
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java Wed Sep 7 16:31:52 2011
@@ -21,6 +21,7 @@ package org.apache.cassandra.db.compacti
*/
+import java.io.Closeable;
import java.io.DataOutput;
import java.io.IOException;
import java.security.MessageDigest;
@@ -42,12 +43,19 @@ public abstract class AbstractCompactedR
}
/**
- * write the row (size + column index + filter + column data, but NOT row key) to @param out
+ * write the row (size + column index + filter + column data, but NOT row key) to @param out.
+ * It is an error to call this if isEmpty is false. (Because the key is appended first,
+ * so we'd have an incomplete row written.)
+ *
+ * write() may change internal state; it is NOT valid to call write() or update() a second time.
*/
public abstract long write(DataOutput out) throws IOException;
/**
- * update @param digest with the data bytes of the row (not including row key or row size)
+ * update @param digest with the data bytes of the row (not including row key or row size).
+ * May be called even if empty.
+ *
+ * update() may change internal state; it is NOT valid to call write() or update() a second time.
*/
public abstract void update(MessageDigest digest);
Added: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java?rev=1166255&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java Wed Sep 7 16:31:52 2011
@@ -0,0 +1,69 @@
+package org.apache.cassandra.db.compaction;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableScanner;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.Throttle;
+
+public abstract class AbstractCompactionIterable implements Iterable<AbstractCompactedRow>, CompactionInfo.Holder
+{
+ public static final int FILE_BUFFER_SIZE = 1024 * 1024;
+
+ private static Logger logger = LoggerFactory.getLogger(CompactionIterable.class);
+
+ protected final CompactionType type;
+ protected final CompactionController controller;
+ protected long totalBytes;
+ protected volatile long bytesRead;
+
+ protected final Throttle throttle;
+
+ public AbstractCompactionIterable(CompactionController controller, CompactionType type)
+ {
+ this.controller = controller;
+ this.type = type;
+ this.throttle = new Throttle(toString(), new Throttle.ThroughputFunction()
+ {
+ /** @return Instantaneous throughput target in bytes per millisecond. */
+ public int targetThroughput()
+ {
+ if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1 || StorageService.instance.isBootstrapMode())
+ // throttling disabled
+ return 0;
+ // total throughput
+ int totalBytesPerMS = DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024 / 1000;
+ // per stream throughput (target bytes per MS)
+ return totalBytesPerMS / Math.max(1, CompactionManager.instance.getActiveCompactions());
+ }
+ });
+ }
+
+ protected static List<SSTableScanner> getScanners(Iterable<SSTableReader> sstables) throws IOException
+ {
+ ArrayList<SSTableScanner> scanners = new ArrayList<SSTableScanner>();
+ for (SSTableReader sstable : sstables)
+ scanners.add(sstable.getDirectScanner(FILE_BUFFER_SIZE));
+ return scanners;
+ }
+
+ public CompactionInfo getCompactionInfo()
+ {
+ return new CompactionInfo(this.hashCode(),
+ controller.getKeyspace(),
+ controller.getColumnFamily(),
+ type,
+ bytesRead,
+ totalBytes);
+ }
+
+ public abstract CloseableIterator<AbstractCompactedRow> iterator();
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java Wed Sep 7 16:31:52 2011
@@ -29,31 +29,27 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.columniterator.IColumnIterator;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableScanner;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.MergeIterator;
-import org.apache.cassandra.utils.Throttle;
-public class CompactionIterable implements Iterable<AbstractCompactedRow>, CompactionInfo.Holder
+public class CompactionIterable extends AbstractCompactionIterable
{
private static Logger logger = LoggerFactory.getLogger(CompactionIterable.class);
- public static final int FILE_BUFFER_SIZE = 1024 * 1024;
-
- private MergeIterator<IColumnIterator, AbstractCompactedRow> source;
- protected final CompactionType type;
+ private long row;
private final List<SSTableScanner> scanners;
- protected final CompactionController controller;
- private final Throttle throttle;
- private long totalBytes;
- private long bytesRead;
- private long row;
+ private static final Comparator<IColumnIterator> comparator = new Comparator<IColumnIterator>()
+ {
+ public int compare(IColumnIterator i1, IColumnIterator i2)
+ {
+ return i1.getKey().compareTo(i2.getKey());
+ }
+ };
public CompactionIterable(CompactionType type, Iterable<SSTableReader> sstables, CompactionController controller) throws IOException
{
@@ -62,27 +58,12 @@ public class CompactionIterable implemen
protected CompactionIterable(CompactionType type, List<SSTableScanner> scanners, CompactionController controller)
{
- this.type = type;
+ super(controller, type);
this.scanners = scanners;
- this.controller = controller;
row = 0;
totalBytes = bytesRead = 0;
for (SSTableScanner scanner : scanners)
totalBytes += scanner.getFileLength();
- this.throttle = new Throttle(toString(), new Throttle.ThroughputFunction()
- {
- /** @return Instantaneous throughput target in bytes per millisecond. */
- public int targetThroughput()
- {
- if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1 || StorageService.instance.isBootstrapMode())
- // throttling disabled
- return 0;
- // total throughput
- int totalBytesPerMS = DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024 / 1000;
- // per stream throughput (target bytes per MS)
- return totalBytesPerMS / Math.max(1, CompactionManager.instance.getActiveCompactions());
- }
- });
}
protected static List<SSTableScanner> getScanners(Iterable<SSTableReader> sstables) throws IOException
@@ -93,33 +74,23 @@ public class CompactionIterable implemen
return scanners;
}
- public CompactionInfo getCompactionInfo()
- {
- return new CompactionInfo(this.hashCode(),
- controller.getKeyspace(),
- controller.getColumnFamily(),
- type,
- bytesRead,
- totalBytes);
- }
-
public CloseableIterator<AbstractCompactedRow> iterator()
{
- return MergeIterator.get(scanners, ICOMP, new Reducer());
+ return MergeIterator.get(scanners, comparator, new Reducer());
}
public String toString()
{
return this.getCompactionInfo().toString();
}
-
+
protected class Reducer extends MergeIterator.Reducer<IColumnIterator, AbstractCompactedRow>
{
protected final List<SSTableIdentityIterator> rows = new ArrayList<SSTableIdentityIterator>();
public void reduce(IColumnIterator current)
{
- rows.add((SSTableIdentityIterator)current);
+ rows.add((SSTableIdentityIterator) current);
}
protected AbstractCompactedRow getReduced()
@@ -134,11 +105,13 @@ public class CompactionIterable implemen
controller.invalidateCachedRow(compactedRow.key);
return null;
}
-
- // If the raw is cached, we call removeDeleted on it to have/ coherent query returns. However it would look
- // like some deleted columns lived longer than gc_grace + compaction. This can also free up big amount of
- // memory on long running instances
- controller.removeDeletedInCache(compactedRow.key);
+ else
+ {
+ // If the raw is cached, we call removeDeleted on it to have/ coherent query returns. However it would look
+ // like some deleted columns lived longer than gc_grace + compaction. This can also free up big amount of
+ // memory on long running instances
+ controller.removeDeletedInCache(compactedRow.key);
+ }
return compactedRow;
}
@@ -147,22 +120,13 @@ public class CompactionIterable implemen
rows.clear();
if ((row++ % 1000) == 0)
{
- bytesRead = 0;
+ long n = 0;
for (SSTableScanner scanner : scanners)
- {
- bytesRead += scanner.getFilePointer();
- }
+ n += scanner.getFilePointer();
+ bytesRead = n;
throttle.throttle(bytesRead);
}
}
}
}
-
- public final static Comparator<IColumnIterator> ICOMP = new Comparator<IColumnIterator>()
- {
- public int compare(IColumnIterator i1, IColumnIterator i2)
- {
- return i1.getKey().compareTo(i2.getKey());
- }
- };
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Wed Sep 7 16:31:52 2011
@@ -699,7 +699,7 @@ public class CompactionManager implement
if (compactionFileLocation == null)
throw new IOException("disk full");
- SSTableScanner scanner = sstable.getDirectScanner(CompactionIterable.FILE_BUFFER_SIZE);
+ SSTableScanner scanner = sstable.getDirectScanner(AbstractCompactionIterable.FILE_BUFFER_SIZE);
Collection<ByteBuffer> indexedColumns = cfs.indexManager.getIndexedColumns();
CleanupInfo ci = new CleanupInfo(sstable, scanner);
executor.beginCompaction(ci);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java Wed Sep 7 16:31:52 2011
@@ -128,7 +128,9 @@ public class CompactionTask extends Abst
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
- CompactionIterable ci = new CompactionIterable(type, toCompact, controller); // retain a handle so we can call close()
+ AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
+ ? new ParallelCompactionIterable(type, toCompact, controller)
+ : new CompactionIterable(type, toCompact, controller);
CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull());
Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java Wed Sep 7 16:31:52 2011
@@ -24,6 +24,7 @@ package org.apache.cassandra.db.compacti
import java.io.DataOutput;
import java.io.IOError;
import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
import java.security.MessageDigest;
import java.util.Iterator;
import java.util.List;
@@ -35,8 +36,8 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.IIterableColumns;
import org.apache.cassandra.utils.MergeIterator;
@@ -56,7 +57,7 @@ public class LazilyCompactedRow extends
{
private static Logger logger = LoggerFactory.getLogger(LazilyCompactedRow.class);
- private final List<SSTableIdentityIterator> rows;
+ private final List<? extends ICountableColumnIterator> rows;
private final CompactionController controller;
private final boolean shouldPurge;
private final DataOutputBuffer headerBuffer;
@@ -65,8 +66,9 @@ public class LazilyCompactedRow extends
private int columnCount;
private long maxTimestamp;
private long columnSerializedSize;
+ private boolean closed;
- public LazilyCompactedRow(CompactionController controller, List<SSTableIdentityIterator> rows)
+ public LazilyCompactedRow(CompactionController controller, List<? extends ICountableColumnIterator> rows)
{
super(rows.get(0).getKey());
this.rows = rows;
@@ -96,6 +98,8 @@ public class LazilyCompactedRow extends
public long write(DataOutput out) throws IOException
{
+ assert !closed;
+
DataOutputBuffer clockOut = new DataOutputBuffer();
ColumnFamily.serializer().serializeCFInfo(emptyColumnFamily, clockOut);
@@ -119,11 +123,14 @@ public class LazilyCompactedRow extends
assert secondPassColumnSize == columnSerializedSize
: "originally calculated column size of " + columnSerializedSize + " but now it is " + secondPassColumnSize;
+ close();
return dataSize;
}
public void update(MessageDigest digest)
{
+ assert !closed;
+
// no special-case for rows.size == 1, we're actually skipping some bytes here so just
// blindly updating everything wouldn't be correct
DataOutputBuffer out = new DataOutputBuffer();
@@ -144,6 +151,7 @@ public class LazilyCompactedRow extends
{
iter.next().updateDigest(digest);
}
+ close();
}
public boolean isEmpty()
@@ -155,8 +163,8 @@ public class LazilyCompactedRow extends
public int getEstimatedColumnCount()
{
int n = 0;
- for (SSTableIdentityIterator row : rows)
- n += row.columnCount;
+ for (ICountableColumnIterator row : rows)
+ n += row.getColumnCount();
return n;
}
@@ -167,7 +175,7 @@ public class LazilyCompactedRow extends
public Iterator<IColumn> iterator()
{
- for (SSTableIdentityIterator row : rows)
+ for (ICountableColumnIterator row : rows)
row.reset();
reducer = new Reducer();
Iterator<IColumn> iter = MergeIterator.get(rows, getComparator().columnComparator, reducer);
@@ -184,6 +192,22 @@ public class LazilyCompactedRow extends
return maxTimestamp;
}
+ private void close()
+ {
+ for (IColumnIterator row : rows)
+ {
+ try
+ {
+ row.close();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+ closed = true;
+ }
+
private class Reducer extends MergeIterator.Reducer<IColumn, IColumn>
{
ColumnFamily container = emptyColumnFamily.cloneMeShallow();
Added: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java?rev=1166255&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java Wed Sep 7 16:31:52 2011
@@ -0,0 +1,456 @@
+package org.apache.cassandra.db.compaction;
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.Condition;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
+import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableScanner;
+import org.apache.cassandra.utils.*;
+
+/**
+ * A class to run compaction taking advantage of multiple-core processes:
+ *
+ * One Deserializer thread per input sstable performs read + deserialize (a row at a time).
+ * The resulting ColumnFamilies are added to a queue, which is fed to the merge Reducer.
+ *
+ * The merge Reducer creates MergeTasks on a thread-per-core Executor, and returns AsyncPrecompactedRow objects.
+ *
+ * The main complication is in handling larger-than-memory rows. When one is encountered, no further deserialization
+ * is done until that row is merged and written -- creating a pipeline stall, as it were. Thus, this is intended
+ * to be useful with mostly-in-memory row sizes, but preserves correctness in the face of occasional exceptions.
+ */
+public class ParallelCompactionIterable extends AbstractCompactionIterable
+{
+ private static Logger logger = LoggerFactory.getLogger(ParallelCompactionIterable.class);
+
+ private final List<SSTableScanner> scanners;
+ private final int maxInMemorySize;
+
+ public ParallelCompactionIterable(CompactionType type, Iterable<SSTableReader> sstables, CompactionController controller) throws IOException
+ {
+ this(type, getScanners(sstables), controller, DatabaseDescriptor.getInMemoryCompactionLimit() / Iterables.size(sstables));
+ }
+
+ public ParallelCompactionIterable(CompactionType type, Iterable<SSTableReader> sstables, CompactionController controller, int maxInMemorySize) throws IOException
+ {
+ this(type, getScanners(sstables), controller, maxInMemorySize);
+ }
+
+ protected ParallelCompactionIterable(CompactionType type, List<SSTableScanner> scanners, CompactionController controller, int maxInMemorySize)
+ {
+ super(controller, type);
+ this.scanners = scanners;
+ this.maxInMemorySize = maxInMemorySize;
+ }
+
+ public CloseableIterator<AbstractCompactedRow> iterator()
+ {
+ List<CloseableIterator<RowContainer>> sources = new ArrayList<CloseableIterator<RowContainer>>();
+ for (SSTableScanner scanner : scanners)
+ sources.add(new Deserializer(scanner, maxInMemorySize));
+ return new Unwrapper(MergeIterator.get(sources, RowContainer.comparator, new Reducer()), controller);
+ }
+
+ private static class Unwrapper extends AbstractIterator<AbstractCompactedRow> implements CloseableIterator<AbstractCompactedRow>
+ {
+ private final MergeIterator<RowContainer, CompactedRowContainer> reducer;
+ private final CompactionController controller;
+
+ public Unwrapper(MergeIterator<RowContainer, CompactedRowContainer> reducer, CompactionController controller)
+ {
+ this.reducer = reducer;
+ this.controller = controller;
+ }
+
+ protected AbstractCompactedRow computeNext()
+ {
+ if (!reducer.hasNext())
+ return endOfData();
+
+ CompactedRowContainer container = reducer.next();
+ AbstractCompactedRow compactedRow;
+ try
+ {
+ compactedRow = container.future == null
+ ? container.row
+ : new PrecompactedRow(container.key, controller, container.future.get());
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ if (compactedRow.isEmpty())
+ {
+ controller.invalidateCachedRow(compactedRow.key);
+ return null;
+ }
+ else
+ {
+ // If the raw is cached, we call removeDeleted on it to have/ coherent query returns. However it would look
+ // like some deleted columns lived longer than gc_grace + compaction. This can also free up big amount of
+ // memory on long running instances
+ controller.removeDeletedInCache(compactedRow.key);
+ return compactedRow;
+ }
+ }
+
+ public void close() throws IOException
+ {
+ reducer.close();
+ }
+ }
+
+ private class Reducer extends MergeIterator.Reducer<RowContainer, CompactedRowContainer>
+ {
+ private final List<RowContainer> rows = new ArrayList<RowContainer>();
+
+ private final ThreadPoolExecutor executor;
+ private int row = 0;
+
+ private Reducer()
+ {
+ super();
+ executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
+ Runtime.getRuntime().availableProcessors(),
+ Integer.MAX_VALUE,
+ TimeUnit.MILLISECONDS,
+ new SynchronousQueue<Runnable>(),
+ new NamedThreadFactory("CompactionReducer"));
+ executor.setRejectedExecutionHandler(DebuggableThreadPoolExecutor.blockingExecutionHandler);
+ }
+
+ public void reduce(RowContainer current)
+ {
+ rows.add(current);
+ }
+
+ protected CompactedRowContainer getReduced()
+ {
+ assert rows.size() > 0;
+
+ CompactedRowContainer compacted = getCompactedRow(rows);
+ rows.clear();
+ if ((row++ % 1000) == 0)
+ {
+ long n = 0;
+ for (SSTableScanner scanner : scanners)
+ n += scanner.getFilePointer();
+ bytesRead = n;
+ throttle.throttle(bytesRead);
+ }
+ return compacted;
+ }
+
+ public CompactedRowContainer getCompactedRow(List<RowContainer> rows)
+ {
+ boolean inMemory = true;
+ for (RowContainer container : rows)
+ {
+ if (container.row == null)
+ {
+ inMemory = false;
+ break;
+ }
+ }
+
+ if (inMemory)
+ return new CompactedRowContainer(rows.get(0).getKey(), executor.submit(new MergeTask(new ArrayList<RowContainer>(rows))));
+
+ List<ICountableColumnIterator> iterators = new ArrayList<ICountableColumnIterator>();
+ for (RowContainer container : rows)
+ iterators.add(container.row == null ? container.wrapper : new DeserializedColumnIterator(container.row));
+ return new CompactedRowContainer(new LazilyCompactedRow(controller, iterators));
+ }
+
+ private class MergeTask implements Callable<ColumnFamily>
+ {
+ private final List<RowContainer> rows;
+
+ public MergeTask(List<RowContainer> rows)
+ {
+ this.rows = rows;
+ }
+
+ public ColumnFamily call() throws Exception
+ {
+ ColumnFamily cf = null;
+ for (RowContainer container : rows)
+ {
+ ColumnFamily thisCF = container.row.cf;
+ if (cf == null)
+ {
+ cf = thisCF;
+ }
+ else
+ {
+ cf.addAll(thisCF, HeapAllocator.instance);
+ }
+ }
+
+ return PrecompactedRow.removeDeletedAndOldShards(rows.get(0).getKey(), controller, cf);
+ }
+ }
+
+ private class DeserializedColumnIterator implements ICountableColumnIterator
+ {
+ private final Row row;
+ private Iterator<IColumn> iter;
+
+ public DeserializedColumnIterator(Row row)
+ {
+ this.row = row;
+ iter = row.cf.iterator();
+ }
+
+ public ColumnFamily getColumnFamily()
+ {
+ return row.cf;
+ }
+
+ public DecoratedKey getKey()
+ {
+ return row.key;
+ }
+
+ public int getColumnCount()
+ {
+ return row.cf.getColumnCount();
+ }
+
+ public void reset()
+ {
+ iter = row.cf.iterator();
+ }
+
+ public void close() throws IOException {}
+
+ public boolean hasNext()
+ {
+ return iter.hasNext();
+ }
+
+ public IColumn next()
+ {
+ return iter.next();
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+ }
+
+ private static class Deserializer extends AbstractIterator<RowContainer> implements CloseableIterator<RowContainer>
+ {
+ private final LinkedBlockingQueue<RowContainer> queue = new LinkedBlockingQueue<RowContainer>(1);
+ private static final RowContainer finished = new RowContainer((Row) null);
+ private Condition condition;
+ private final SSTableScanner scanner;
+
+ public Deserializer(SSTableScanner ssts, final int maxInMemorySize)
+ {
+ this.scanner = ssts;
+ Runnable runnable = new WrappedRunnable()
+ {
+ protected void runMayThrow() throws Exception
+ {
+ while (true)
+ {
+ if (condition != null)
+ condition.await();
+
+ if (!scanner.hasNext())
+ {
+ queue.put(finished);
+ break;
+ }
+
+ SSTableIdentityIterator iter = (SSTableIdentityIterator) scanner.next();
+ if (iter.dataSize > maxInMemorySize)
+ {
+ logger.debug("parallel lazy deserialize from " + iter.getPath());
+ condition = new SimpleCondition();
+ queue.put(new RowContainer(new NotifyingSSTableIdentityIterator(iter, condition)));
+ }
+ else
+ {
+ logger.debug("parallel eager deserialize from " + iter.getPath());
+ queue.put(new RowContainer(new Row(iter.getKey(), iter.getColumnFamilyWithColumns())));
+ }
+ }
+ }
+ };
+ new Thread(runnable, "Deserialize " + scanner.sstable).start();
+ }
+
+ protected RowContainer computeNext()
+ {
+ RowContainer container;
+ try
+ {
+ container = queue.take();
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ return container == finished ? endOfData() : container;
+ }
+
+ public void close() throws IOException
+ {
+ scanner.close();
+ }
+ }
+
+ /**
+ * a wrapper around SSTII that notifies the given condition when it is closed
+ */
+ private static class NotifyingSSTableIdentityIterator implements ICountableColumnIterator
+ {
+ private final SSTableIdentityIterator wrapped;
+ private final Condition condition;
+
+ public NotifyingSSTableIdentityIterator(SSTableIdentityIterator wrapped, Condition condition)
+ {
+ this.wrapped = wrapped;
+ this.condition = condition;
+ }
+
+ public ColumnFamily getColumnFamily()
+ {
+ return wrapped.getColumnFamily();
+ }
+
+ public DecoratedKey getKey()
+ {
+ return wrapped.getKey();
+ }
+
+ public int getColumnCount()
+ {
+ return wrapped.getColumnCount();
+ }
+
+ public void reset()
+ {
+ wrapped.reset();
+ }
+
+ public void close() throws IOException
+ {
+ wrapped.close();
+ condition.signal();
+ }
+
+ public boolean hasNext()
+ {
+ return wrapped.hasNext();
+ }
+
+ public IColumn next()
+ {
+ return wrapped.next();
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static class RowContainer
+ {
+ // either row is not null, or wrapper is not null. But not both.
+ public final Row row;
+ public final NotifyingSSTableIdentityIterator wrapper;
+ public static final Comparator<RowContainer> comparator = new Comparator<RowContainer>()
+ {
+ public int compare(RowContainer o1, RowContainer o2)
+ {
+ return o1.getKey().compareTo(o2.getKey());
+ }
+ };
+
+ private RowContainer(Row row)
+ {
+ this.row = row;
+ wrapper = null;
+ }
+
+ public RowContainer(NotifyingSSTableIdentityIterator wrapper)
+ {
+ this.wrapper = wrapper;
+ row = null;
+ }
+
+ public DecoratedKey getKey()
+ {
+ return row == null ? wrapper.getKey() : row.key;
+ }
+ }
+
+ private static class CompactedRowContainer
+ {
+ public final DecoratedKey key;
+ /** either "future" or "row" will be not-null, but not both at once. */
+ public final Future<ColumnFamily> future;
+ public final LazilyCompactedRow row;
+
+ private CompactedRowContainer(DecoratedKey key, Future<ColumnFamily> future)
+ {
+ this.key = key;
+ this.future = future;
+ row = null;
+ }
+
+ private CompactedRowContainer(LazilyCompactedRow row)
+ {
+ this.row = row;
+ future = null;
+ key = null;
+ }
+ }
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java Wed Sep 7 16:31:52 2011
@@ -56,6 +56,14 @@ public class PrecompactedRow extends Abs
this.gcBefore = Integer.MAX_VALUE;
}
+ /** it is caller's responsibility to call removeDeleted + removeOldShards from the cf before calling this constructor */
+ public PrecompactedRow(DecoratedKey key, CompactionController controller, ColumnFamily cf)
+ {
+ super(key);
+ this.gcBefore = controller.gcBefore;
+ compactedCf = cf;
+ }
+
public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, CompactionController controller, ColumnFamily cf)
{
return removeDeletedAndOldShards(controller.shouldPurge(key), controller, cf);
@@ -71,9 +79,9 @@ public class PrecompactedRow extends Abs
public PrecompactedRow(CompactionController controller, List<SSTableIdentityIterator> rows)
{
- super(rows.get(0).getKey());
- gcBefore = controller.gcBefore;
- compactedCf = removeDeletedAndOldShards(rows.get(0).getKey(), controller, merge(rows));
+ this(rows.get(0).getKey(),
+ controller,
+ removeDeletedAndOldShards(rows.get(0).getKey(), controller, merge(rows)));
}
private static ColumnFamily merge(List<SSTableIdentityIterator> rows)
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java Wed Sep 7 16:31:52 2011
@@ -31,11 +31,12 @@ import org.apache.cassandra.db.ColumnFam
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.db.columniterator.ICountableColumnIterator;
import org.apache.cassandra.db.marshal.MarshalException;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.BytesReadTracker;
-public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, IColumnIterator
+public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, ICountableColumnIterator
{
private static final Logger logger = LoggerFactory.getLogger(SSTableIdentityIterator.class);
@@ -46,7 +47,7 @@ public class SSTableIdentityIterator imp
public final boolean fromRemote;
private final ColumnFamily columnFamily;
- public final int columnCount;
+ private final int columnCount;
private long columnPosition;
private BytesReadTracker inputWithTracker; // tracks bytes read
@@ -271,4 +272,9 @@ public class SSTableIdentityIterator imp
}
inputWithTracker.reset(headerSize());
}
+
+ public int getColumnCount()
+ {
+ return columnCount;
+ }
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java?rev=1166255&r1=1166254&r2=1166255&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java Wed Sep 7 16:31:52 2011
@@ -21,25 +21,23 @@ package org.apache.cassandra.io;
*/
-import static junit.framework.Assert.assertEquals;
-
-import java.io.*;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Collection;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
+import org.junit.Test;
+
import org.apache.cassandra.CleanupHelper;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.*;
-import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.io.sstable.IndexHelper;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
@@ -48,30 +46,54 @@ import org.apache.cassandra.io.util.Data
import org.apache.cassandra.io.util.MappedFileDataInput;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
-import org.junit.Test;
+import static junit.framework.Assert.assertEquals;
public class LazilyCompactedRowTest extends CleanupHelper
{
- private void assertBytes(ColumnFamilyStore cfs, int gcBefore) throws IOException
+ private static void assertBytes(ColumnFamilyStore cfs, int gcBefore) throws IOException
{
Collection<SSTableReader> sstables = cfs.getSSTables();
- Iterator<AbstractCompactedRow> ci1 = new CompactionIterable(CompactionType.UNKNOWN, sstables, new PreCompactingController(cfs, sstables, gcBefore, false)).iterator();
- Iterator<AbstractCompactedRow> ci2 = new CompactionIterable(CompactionType.UNKNOWN, sstables, new LazilyCompactingController(cfs, sstables, gcBefore, false)).iterator();
+
+ // compare eager and lazy compactions
+ AbstractCompactionIterable eager = new CompactionIterable(CompactionType.UNKNOWN,
+ sstables,
+ new PreCompactingController(cfs, sstables, gcBefore, false));
+ AbstractCompactionIterable lazy = new CompactionIterable(CompactionType.UNKNOWN,
+ sstables,
+ new LazilyCompactingController(cfs, sstables, gcBefore, false));
+ assertBytes(sstables, eager, lazy);
+
+ // compare eager and parallel-lazy compactions
+ eager = new CompactionIterable(CompactionType.UNKNOWN,
+ sstables,
+ new PreCompactingController(cfs, sstables, gcBefore, false));
+ AbstractCompactionIterable parallel = new ParallelCompactionIterable(CompactionType.UNKNOWN,
+ sstables,
+ new CompactionController(cfs, sstables, gcBefore, false),
+ 0);
+ assertBytes(sstables, eager, parallel);
+ }
+
+ private static void assertBytes(Collection<SSTableReader> sstables, AbstractCompactionIterable ci1, AbstractCompactionIterable ci2) throws IOException
+ {
+ CloseableIterator<AbstractCompactedRow> iter1 = ci1.iterator();
+ CloseableIterator<AbstractCompactedRow> iter2 = ci2.iterator();
while (true)
{
- if (!ci1.hasNext())
+ if (!iter1.hasNext())
{
- assert !ci2.hasNext();
+ assert !iter2.hasNext();
break;
}
- AbstractCompactedRow row1 = ci1.next();
- AbstractCompactedRow row2 = ci2.next();
+ AbstractCompactedRow row1 = iter1.next();
+ AbstractCompactedRow row2 = iter2.next();
DataOutputBuffer out1 = new DataOutputBuffer();
DataOutputBuffer out2 = new DataOutputBuffer();
row1.write(out1);
@@ -94,8 +116,8 @@ public class LazilyCompactedRowTest exte
// row size can differ b/c of bloom filter counts being different
long rowSize1 = SSTableReader.readRowSize(in1, sstables.iterator().next().descriptor);
long rowSize2 = SSTableReader.readRowSize(in2, sstables.iterator().next().descriptor);
- assertEquals(out1.getLength(), rowSize1 + 8);
- assertEquals(out2.getLength(), rowSize2 + 8);
+ assertEquals(rowSize1 + 8, out1.getLength());
+ assertEquals(rowSize2 + 8, out2.getLength());
// bloom filter
IndexHelper.defreezeBloomFilter(in1, rowSize1, false);
IndexHelper.defreezeBloomFilter(in2, rowSize2, false);
@@ -115,7 +137,7 @@ public class LazilyCompactedRowTest exte
ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf1, in1);
ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf2, in2);
assert cf1.getLocalDeletionTime() == cf2.getLocalDeletionTime();
- assert cf1.getMarkedForDeleteAt() == cf2.getMarkedForDeleteAt();
+ assert cf1.getMarkedForDeleteAt() == cf2.getMarkedForDeleteAt();
// columns
int columns = in1.readInt();
assert columns == in2.readInt();
@@ -130,23 +152,25 @@ public class LazilyCompactedRowTest exte
assert in2.available() == 0;
}
}
-
+
private void assertDigest(ColumnFamilyStore cfs, int gcBefore) throws IOException, NoSuchAlgorithmException
{
Collection<SSTableReader> sstables = cfs.getSSTables();
- Iterator<AbstractCompactedRow> ci1 = new CompactionIterable(CompactionType.UNKNOWN, sstables, new PreCompactingController(cfs, sstables, gcBefore, false)).iterator();
- Iterator<AbstractCompactedRow> ci2 = new CompactionIterable(CompactionType.UNKNOWN, sstables, new LazilyCompactingController(cfs, sstables, gcBefore, false)).iterator();
+ AbstractCompactionIterable ci1 = new CompactionIterable(CompactionType.UNKNOWN, sstables, new PreCompactingController(cfs, sstables, gcBefore, false));
+ AbstractCompactionIterable ci2 = new CompactionIterable(CompactionType.UNKNOWN, sstables, new LazilyCompactingController(cfs, sstables, gcBefore, false));
+ CloseableIterator<AbstractCompactedRow> iter1 = ci1.iterator();
+ CloseableIterator<AbstractCompactedRow> iter2 = ci2.iterator();
while (true)
{
- if (!ci1.hasNext())
+ if (!iter1.hasNext())
{
- assert !ci2.hasNext();
+ assert !iter2.hasNext();
break;
}
- AbstractCompactedRow row1 = ci1.next();
- AbstractCompactedRow row2 = ci2.next();
+ AbstractCompactedRow row1 = iter1.next();
+ AbstractCompactedRow row2 = iter2.next();
MessageDigest digest1 = MessageDigest.getInstance("MD5");
MessageDigest digest2 = MessageDigest.getInstance("MD5");