You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/01/05 16:55:47 UTC
[2/2] cassandra git commit: Make sure the same token does not exist
in several data directories
Make sure the same token does not exist in several data directories
Patch by marcuse; reviewed by Yuki Morishita and Carl Yeksigian for CASSANDRA-6696
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e2c63418
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e2c63418
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e2c63418
Branch: refs/heads/cassandra-3.2
Commit: e2c6341898fa43b0e262ef031f267587050b8d0f
Parents: 1568293
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Aug 3 10:12:47 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jan 5 16:48:50 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 5 +
conf/cassandra.yaml | 13 +-
.../org/apache/cassandra/config/Config.java | 2 +-
.../cassandra/config/DatabaseDescriptor.java | 3 -
.../cassandra/db/BlacklistedDirectories.java | 10 +
.../db/BlacklistedDirectoriesMBean.java | 9 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 97 ++++-
.../org/apache/cassandra/db/Directories.java | 36 +-
src/java/org/apache/cassandra/db/Memtable.java | 193 +++++-----
.../db/compaction/CompactionManager.java | 78 +++-
.../compaction/CompactionStrategyManager.java | 361 +++++++++++++------
.../cassandra/db/compaction/CompactionTask.java | 2 +-
.../DateTieredCompactionStrategy.java | 3 +-
.../db/compaction/LeveledManifest.java | 2 +-
.../cassandra/db/compaction/OperationType.java | 3 +-
.../cassandra/db/compaction/Scrubber.java | 7 +-
.../SizeTieredCompactionStrategy.java | 9 +-
.../writers/CompactionAwareWriter.java | 91 ++++-
.../writers/DefaultCompactionWriter.java | 17 +-
.../writers/MajorLeveledCompactionWriter.java | 53 +--
.../writers/MaxSSTableSizeWriter.java | 22 +-
.../SplittingSizeTieredCompactionWriter.java | 17 +-
.../apache/cassandra/db/lifecycle/Tracker.java | 4 +-
.../org/apache/cassandra/db/lifecycle/View.java | 2 +-
.../org/apache/cassandra/dht/IPartitioner.java | 17 +
.../cassandra/dht/Murmur3Partitioner.java | 23 ++
.../apache/cassandra/dht/RandomPartitioner.java | 24 ++
src/java/org/apache/cassandra/dht/Range.java | 17 +
src/java/org/apache/cassandra/dht/Splitter.java | 124 +++++++
.../io/sstable/SimpleSSTableMultiWriter.java | 7 +-
.../sstable/format/RangeAwareSSTableWriter.java | 205 +++++++++++
.../cassandra/io/util/DiskAwareRunnable.java | 17 +-
.../cassandra/service/StorageService.java | 70 ++++
.../cassandra/service/StorageServiceMBean.java | 1 +
.../cassandra/streaming/StreamReader.java | 11 +-
.../cassandra/streaming/StreamSession.java | 4 +-
.../cassandra/streaming/StreamWriter.java | 2 +-
.../compress/CompressedStreamReader.java | 3 +-
.../compress/CompressedStreamWriter.java | 3 +-
.../org/apache/cassandra/tools/NodeProbe.java | 5 +
.../org/apache/cassandra/tools/NodeTool.java | 3 +-
.../cassandra/tools/SSTableOfflineRelevel.java | 21 +-
.../tools/nodetool/RelocateSSTables.java | 49 +++
.../LongLeveledCompactionStrategyTest.java | 4 +-
.../unit/org/apache/cassandra/db/ScrubTest.java | 6 +-
.../db/compaction/CompactionsCQLTest.java | 5 +-
.../LeveledCompactionStrategyTest.java | 36 +-
.../cassandra/db/lifecycle/TrackerTest.java | 6 +-
.../apache/cassandra/db/lifecycle/ViewTest.java | 3 +-
.../apache/cassandra/dht/LengthPartitioner.java | 6 +
.../org/apache/cassandra/dht/SplitterTest.java | 158 ++++++++
52 files changed, 1509 insertions(+), 361 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9c3a50f..43acd43 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.2
+ * Make sure tokens don't exist in several data directories (CASSANDRA-6696)
* Add requireAuthorization method to IAuthorizer (CASSANDRA-10852)
* Move static JVM options to conf/jvm.options file (CASSANDRA-10494)
* Fix CassandraVersion to accept x.y version string (CASSANDRA-10931)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 1269c98..f5f50c1 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -18,6 +18,11 @@ using the provided 'sstableupgrade' tool.
New features
------------
+ - We now make sure that a token does not exist in several data directories. This
+ means that we run one compaction strategy per data_file_directory and we use
+ one thread per directory to flush. Use nodetool relocatesstables to make sure your
+ tokens are in the correct place, or just wait and compaction will handle it. See
+ CASSANDRA-6696 for more details.
- bound maximum in-flight commit log replay mutation bytes to 64 megabytes
tunable via cassandra.commitlog_max_outstanding_replay_bytes
- Support for type casting has been added to the selection clause.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 74e1d1d..a8a90ba 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -413,14 +413,13 @@ memtable_allocation_type: heap_buffers
# This sets the amount of memtable flush writer threads. These will
# be blocked by disk io, and each one will hold a memtable in memory
-# while blocked.
+# while blocked.
#
-# memtable_flush_writers defaults to the smaller of (number of disks,
-# number of cores), with a minimum of 2 and a maximum of 8.
-#
-# If your data directories are backed by SSD, you should increase this
-# to the number of cores.
-#memtable_flush_writers: 8
+# memtable_flush_writers defaults to one per data_file_directory.
+#
+# If your data directories are backed by SSD, you can increase this, but
+# avoid having memtable_flush_writers * data_file_directories > number of cores
+#memtable_flush_writers: 1
# A fixed memory pool size in MB for for SSTable index summaries. If left
# empty, this will default to 5% of the heap size. If the memory usage of
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 2bace5e..54cb089 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -98,7 +98,7 @@ public class Config
@Deprecated
public Integer concurrent_replicates = null;
- public Integer memtable_flush_writers = null;
+ public Integer memtable_flush_writers = 1;
public Integer memtable_heap_space_in_mb;
public Integer memtable_offheap_space_in_mb;
public Float memtable_cleanup_threshold = null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 3fc0b31..c82e930 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -582,9 +582,6 @@ public class DatabaseDescriptor
if (conf.hints_directory.equals(conf.saved_caches_directory))
throw new ConfigurationException("saved_caches_directory must not be the same as the hints_directory", false);
- if (conf.memtable_flush_writers == null)
- conf.memtable_flush_writers = Math.min(8, Math.max(2, Math.min(FBUtilities.getAvailableProcessors(), conf.data_file_directories.length)));
-
if (conf.memtable_flush_writers < 1)
throw new ConfigurationException("memtable_flush_writers must be at least 1, but was " + conf.memtable_flush_writers, false);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
index f47fd57..3e6332c 100644
--- a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
+++ b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
@@ -66,6 +66,16 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
return Collections.unmodifiableSet(unwritableDirectories);
}
+ public void markUnreadable(String path)
+ {
+ maybeMarkUnreadable(new File(path));
+ }
+
+ public void markUnwritable(String path)
+ {
+ maybeMarkUnwritable(new File(path));
+ }
+
/**
* Adds parent directory of the file (or the file itself, if it is a directory)
* to the set of unreadable directories.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/BlacklistedDirectoriesMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BlacklistedDirectoriesMBean.java b/src/java/org/apache/cassandra/db/BlacklistedDirectoriesMBean.java
index 3163b9a..3fb9f39 100644
--- a/src/java/org/apache/cassandra/db/BlacklistedDirectoriesMBean.java
+++ b/src/java/org/apache/cassandra/db/BlacklistedDirectoriesMBean.java
@@ -20,10 +20,13 @@ package org.apache.cassandra.db;
import java.io.File;
import java.util.Set;
-public interface BlacklistedDirectoriesMBean {
-
+public interface BlacklistedDirectoriesMBean
+{
public Set<File> getUnreadableDirectories();
public Set<File> getUnwritableDirectories();
-
+
+ public void markUnreadable(String path);
+
+ public void markUnwritable(String path);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 738c941..6eefb5f 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -118,13 +118,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
- private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+ private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(1,
StageManager.KEEPALIVE,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory("MemtableFlushWriter"),
"internal");
+ private static final ExecutorService [] perDiskflushExecutors = new ExecutorService[DatabaseDescriptor.getAllDataFileLocations().length];
+ static
+ {
+ for (int i = 0; i < DatabaseDescriptor.getAllDataFileLocations().length; i++)
+ {
+ perDiskflushExecutors[i] = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+ StageManager.KEEPALIVE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new NamedThreadFactory("PerDiskMemtableFlushWriter_"+i),
+ "internal");
+ }
+ }
+
// post-flush executor is single threaded to provide guarantee that any flush Future on a CF will never return until prior flushes have completed
private static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor(1,
StageManager.KEEPALIVE,
@@ -458,7 +472,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public Directories getDirectories()
{
- return directories;
+ // todo, hack since we need to know the data directories when constructing the compaction strategy
+ if (directories != null)
+ return directories;
+ return new Directories(metadata, initialDirectories);
}
public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn)
@@ -1033,11 +1050,74 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
for (Memtable memtable : memtables)
{
- // flush the memtable
- MoreExecutors.sameThreadExecutor().execute(memtable.flushRunnable());
+ List<Future<SSTableMultiWriter>> futures = new ArrayList<>();
+ long totalBytesOnDisk = 0;
+ long maxBytesOnDisk = 0;
+ long minBytesOnDisk = Long.MAX_VALUE;
+ List<SSTableReader> sstables = new ArrayList<>();
+ try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH))
+ {
+ // flush the memtable
+ List<Memtable.FlushRunnable> flushRunnables = memtable.flushRunnables(txn);
+
+ for (int i = 0; i < flushRunnables.size(); i++)
+ futures.add(perDiskflushExecutors[i].submit(flushRunnables.get(i)));
+
+ List<SSTableMultiWriter> flushResults = Lists.newArrayList(FBUtilities.waitOnFutures(futures));
+
+ try
+ {
+ Iterator<SSTableMultiWriter> writerIterator = flushResults.iterator();
+ while (writerIterator.hasNext())
+ {
+ SSTableMultiWriter writer = writerIterator.next();
+ if (writer.getFilePointer() > 0)
+ {
+ writer.setOpenResult(true).prepareToCommit();
+ }
+ else
+ {
+ maybeFail(writer.abort(null));
+ writerIterator.remove();
+ }
+ }
+ }
+ catch (Throwable t)
+ {
+ for (SSTableMultiWriter writer : flushResults)
+ t = writer.abort(t);
+ t = txn.abort(t);
+ Throwables.propagate(t);
+ }
+
+ txn.prepareToCommit();
+
+ Throwable accumulate = null;
+ for (SSTableMultiWriter writer : flushResults)
+ accumulate = writer.commit(accumulate);
+
+ maybeFail(txn.commit(accumulate));
+
+ for (SSTableMultiWriter writer : flushResults)
+ {
+ Collection<SSTableReader> flushedSSTables = writer.finished();
+ for (SSTableReader sstable : flushedSSTables)
+ {
+ if (sstable != null)
+ {
+ sstables.add(sstable);
+ long size = sstable.bytesOnDisk();
+ totalBytesOnDisk += size;
+ maxBytesOnDisk = Math.max(maxBytesOnDisk, size);
+ minBytesOnDisk = Math.min(minBytesOnDisk, size);
+ }
+ }
+ }
+ }
+ memtable.cfs.replaceFlushed(memtable, sstables);
reclaim(memtable);
+ logger.debug("Flushed to {} ({} sstables, {} bytes), biggest {} bytes, smallest {} bytes", sstables, sstables.size(), totalBytesOnDisk, maxBytesOnDisk, minBytesOnDisk);
}
-
// signal the post-flush we've done our work
postFlush.latch.countDown();
}
@@ -1366,6 +1446,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, excludeCurrentVersion);
}
+ public CompactionManager.AllSSTableOpStatus relocateSSTables() throws ExecutionException, InterruptedException
+ {
+ return CompactionManager.instance.relocateSSTables(this);
+ }
+
public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType)
{
assert !sstables.isEmpty();
@@ -2206,7 +2291,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public int getUnleveledSSTables()
{
- return this.compactionStrategyManager.getUnleveledSSTables();
+ return compactionStrategyManager.getUnleveledSSTables();
}
public int[] getSSTableCountPerLevel()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 8744d43..a572bed 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.config.*;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.FSError;
import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -289,6 +290,19 @@ public class Directories
return null;
}
+ public DataDirectory getDataDirectoryForFile(File directory)
+ {
+ if (directory != null)
+ {
+ for (DataDirectory dataDirectory : dataDirectories)
+ {
+ if (directory.getAbsolutePath().startsWith(dataDirectory.location.getAbsolutePath()))
+ return dataDirectory;
+ }
+ }
+ return null;
+ }
+
public Descriptor find(String filename)
{
for (File dir : dataPaths)
@@ -403,7 +417,7 @@ public class Directories
for (DataDirectory dataDir : paths)
{
if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
- continue;
+ continue;
DataDirectoryCandidate candidate = new DataDirectoryCandidate(dataDir);
// exclude directory if its total writeSize does not fit to data directory
if (candidate.availableSpace < writeSize)
@@ -413,6 +427,26 @@ public class Directories
return totalAvailable > expectedTotalWriteSize;
}
+ public DataDirectory[] getWriteableLocations()
+ {
+ List<DataDirectory> nonBlacklistedDirs = new ArrayList<>();
+ for (DataDirectory dir : dataDirectories)
+ {
+ if (!BlacklistedDirectories.isUnwritable(dir.location))
+ nonBlacklistedDirs.add(dir);
+ }
+
+ Collections.sort(nonBlacklistedDirs, new Comparator<DataDirectory>()
+ {
+ @Override
+ public int compare(DataDirectory o1, DataDirectory o2)
+ {
+ return o1.location.compareTo(o2.location);
+ }
+ });
+ return nonBlacklistedDirs.toArray(new DataDirectory[nonBlacklistedDirs.size()]);
+ }
+
public static File getSnapshotDirectory(Descriptor desc, String snapshotName)
{
return getSnapshotDirectory(desc.directory, snapshotName);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 96b1775..8e7a43c 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db;
-import java.io.File;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,14 +44,14 @@ import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
import org.apache.cassandra.index.transactions.UpdateTransaction;
import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.SSTableTxnWriter;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.io.util.DiskAwareRunnable;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.memory.MemtableAllocator;
import org.apache.cassandra.utils.memory.MemtablePool;
@@ -250,9 +249,32 @@ public class Memtable implements Comparable<Memtable>
return partitions.size();
}
- public FlushRunnable flushRunnable()
+ public List<FlushRunnable> flushRunnables(LifecycleTransaction txn)
{
- return new FlushRunnable(lastReplayPosition.get());
+ List<Range<Token>> localRanges = Range.sort(StorageService.instance.getLocalRanges(cfs.keyspace.getName()));
+
+ if (!cfs.getPartitioner().splitter().isPresent() || localRanges.isEmpty())
+ return Collections.singletonList(new FlushRunnable(lastReplayPosition.get(), txn));
+
+ return createFlushRunnables(localRanges, txn);
+ }
+
+ private List<FlushRunnable> createFlushRunnables(List<Range<Token>> localRanges, LifecycleTransaction txn)
+ {
+ assert cfs.getPartitioner().splitter().isPresent();
+
+ Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
+ List<PartitionPosition> boundaries = StorageService.getDiskBoundaries(localRanges, cfs.getPartitioner(), locations);
+ List<FlushRunnable> runnables = new ArrayList<>(boundaries.size());
+ PartitionPosition rangeStart = cfs.getPartitioner().getMinimumToken().minKeyBound();
+ ReplayPosition context = lastReplayPosition.get();
+ for (int i = 0; i < boundaries.size(); i++)
+ {
+ PartitionPosition t = boundaries.get(i);
+ runnables.add(new FlushRunnable(context, rangeStart, t, locations[i], txn));
+ rangeStart = t;
+ }
+ return runnables;
}
public String toString()
@@ -312,23 +334,41 @@ public class Memtable implements Comparable<Memtable>
return creationTime;
}
- class FlushRunnable extends DiskAwareRunnable
+ class FlushRunnable implements Callable<SSTableMultiWriter>
{
- private final ReplayPosition context;
+ public final ReplayPosition context;
private final long estimatedSize;
+ private final ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush;
private final boolean isBatchLogTable;
+ private final SSTableMultiWriter writer;
+
+ // keeping these to be able to log what we are actually flushing
+ private final PartitionPosition from;
+ private final PartitionPosition to;
- FlushRunnable(ReplayPosition context)
+ FlushRunnable(ReplayPosition context, PartitionPosition from, PartitionPosition to, Directories.DataDirectory flushLocation, LifecycleTransaction txn)
{
- this.context = context;
+ this(context, partitions.subMap(from, to), flushLocation, from, to, txn);
+ }
+
+ FlushRunnable(ReplayPosition context, LifecycleTransaction txn)
+ {
+ this(context, partitions, null, null, null, txn);
+ }
+ FlushRunnable(ReplayPosition context, ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush, Directories.DataDirectory flushLocation, PartitionPosition from, PartitionPosition to, LifecycleTransaction txn)
+ {
+ this.context = context;
+ this.toFlush = toFlush;
+ this.from = from;
+ this.to = to;
long keySize = 0;
- for (PartitionPosition key : partitions.keySet())
+ for (PartitionPosition key : toFlush.keySet())
{
// make sure we don't write non-sensical keys
assert key instanceof DecoratedKey;
- keySize += ((DecoratedKey)key).getKey().remaining();
+ keySize += ((DecoratedKey) key).getKey().remaining();
}
estimatedSize = (long) ((keySize // index entries
+ keySize // keys in data file
@@ -336,21 +376,12 @@ public class Memtable implements Comparable<Memtable>
* 1.2); // bloom filter and row index overhead
this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
- }
- public long getExpectedWriteSize()
- {
- return estimatedSize;
- }
+ if (flushLocation == null)
+ writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getLocationForDisk(getDirectories().getWriteableLocation(estimatedSize))), columnsCollector.get(), statsCollector.get());
+ else
+ writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getLocationForDisk(flushLocation)), columnsCollector.get(), statsCollector.get());
- protected void runMayThrow() throws Exception
- {
- long writeSize = getExpectedWriteSize();
- Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
- File sstableDirectory = cfs.getDirectories().getLocationForDisk(dataDirectory);
- assert sstableDirectory != null : "Flush task is not bound to any disk";
- Collection<SSTableReader> sstables = writeSortedContents(context, sstableDirectory);
- cfs.replaceFlushed(Memtable.this, sstables);
}
protected Directories getDirectories()
@@ -358,90 +389,64 @@ public class Memtable implements Comparable<Memtable>
return cfs.getDirectories();
}
- private Collection<SSTableReader> writeSortedContents(ReplayPosition context, File sstableDirectory)
+ private void writeSortedContents(ReplayPosition context)
{
- logger.debug("Writing {}", Memtable.this.toString());
+ logger.debug("Writing {}, flushed range = ({}, {}]", Memtable.this.toString(), from, to);
- Collection<SSTableReader> ssTables;
- try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
+ boolean trackContention = logger.isTraceEnabled();
+ int heavilyContendedRowCount = 0;
+ // (we can't clear out the map as-we-go to free up memory,
+ // since the memtable is being used for queries in the "pending flush" category)
+ for (AtomicBTreePartition partition : toFlush.values())
{
- boolean trackContention = logger.isTraceEnabled();
- int heavilyContendedRowCount = 0;
- // (we can't clear out the map as-we-go to free up memory,
- // since the memtable is being used for queries in the "pending flush" category)
- for (AtomicBTreePartition partition : partitions.values())
+ // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
+ // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
+ // we don't need to preserve tombstones for repair. So if both operation are in this
+ // memtable (which will almost always be the case if there is no ongoing failure), we can
+ // just skip the entry (CASSANDRA-4667).
+ if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
+ continue;
+
+ if (trackContention && partition.usePessimisticLocking())
+ heavilyContendedRowCount++;
+
+ if (!partition.isEmpty())
{
- // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
- // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
- // we don't need to preserve tombstones for repair. So if both operation are in this
- // memtable (which will almost always be the case if there is no ongoing failure), we can
- // just skip the entry (CASSANDRA-4667).
- if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
- continue;
-
- if (trackContention && partition.usePessimisticLocking())
- heavilyContendedRowCount++;
-
- if (!partition.isEmpty())
+ try (UnfilteredRowIterator iter = partition.unfilteredIterator())
{
- try (UnfilteredRowIterator iter = partition.unfilteredIterator())
- {
- writer.append(iter);
- }
+ writer.append(iter);
}
}
+ }
- if (writer.getFilePointer() > 0)
- {
- logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
- writer.getFilename(),
- FBUtilities.prettyPrintMemory(writer.getFilePointer()),
- context));
+ logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
+ writer.getFilename(),
+ FBUtilities.prettyPrintMemory(writer.getFilePointer()),
+ context));
- // sstables should contain non-repaired data.
- ssTables = writer.finish(true);
- }
- else
- {
- logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}",
- writer.getFilename(), context);
- writer.abort();
- ssTables = null;
- }
+ if (heavilyContendedRowCount > 0)
+ logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, toFlush.size(), Memtable.this.toString()));
+ }
- if (heavilyContendedRowCount > 0)
- logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+ public SSTableMultiWriter createFlushWriter(LifecycleTransaction txn,
+ String filename,
+ PartitionColumns columns,
+ EncodingStats stats)
+ {
+ MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context);
+ return cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
+ (long)toFlush.size(),
+ ActiveRepairService.UNREPAIRED_SSTABLE,
+ sstableMetadataCollector,
+ new SerializationHeader(true, cfs.metadata, columns, stats), txn);
- return ssTables;
- }
}
- @SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter
- public SSTableTxnWriter createFlushWriter(String filename,
- PartitionColumns columns,
- EncodingStats stats)
+ @Override
+ public SSTableMultiWriter call()
{
- // we operate "offline" here, as we expose the resulting reader consciously when done
- // (although we may want to modify this behaviour in future, to encapsulate full flush behaviour in LifecycleTransaction)
- LifecycleTransaction txn = null;
- try
- {
- txn = LifecycleTransaction.offline(OperationType.FLUSH);
- MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context);
- return new SSTableTxnWriter(txn,
- cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
- (long) partitions.size(),
- ActiveRepairService.UNREPAIRED_SSTABLE,
- sstableMetadataCollector,
- new SerializationHeader(true, cfs.metadata, columns, stats),
- txn));
- }
- catch (Throwable t)
- {
- if (txn != null)
- txn.close();
- throw t;
- }
+ writeSortedContents(context);
+ return writer;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 571088b..1f39767 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.concurrent.*;
+import java.util.stream.Collectors;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.openmbean.OpenDataException;
@@ -270,7 +271,7 @@ public class CompactionManager implements CompactionManagerMBean
Iterable<SSTableReader> sstables = Lists.newArrayList(operation.filterSSTables(compacting));
if (Iterables.isEmpty(sstables))
{
- logger.info("No sstables for {}.{}", cfs.keyspace.getName(), cfs.name);
+ logger.info("No sstables to {} for {}.{}", operationType.name(), cfs.keyspace.getName(), cfs.name);
return AllSSTableOpStatus.SUCCESSFUL;
}
@@ -432,6 +433,77 @@ public class CompactionManager implements CompactionManagerMBean
}, OperationType.CLEANUP);
}
+ public AllSSTableOpStatus relocateSSTables(final ColumnFamilyStore cfs) throws ExecutionException, InterruptedException
+ {
+ if (!cfs.getPartitioner().splitter().isPresent())
+ {
+ logger.info("Partitioner does not support splitting");
+ return AllSSTableOpStatus.ABORTED;
+ }
+ final Collection<Range<Token>> r = StorageService.instance.getLocalRanges(cfs.keyspace.getName());
+
+ if (r.isEmpty())
+ {
+ logger.info("Relocate cannot run before a node has joined the ring");
+ return AllSSTableOpStatus.ABORTED;
+ }
+
+ final List<Range<Token>> localRanges = Range.sort(r);
+ final Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
+ final List<PartitionPosition> diskBoundaries = StorageService.getDiskBoundaries(localRanges, cfs.getPartitioner(), locations);
+
+ return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
+ {
+ @Override
+ public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
+ {
+ Set<SSTableReader> originals = Sets.newHashSet(transaction.originals());
+ Set<SSTableReader> needsRelocation = originals.stream().filter(s -> !inCorrectLocation(s)).collect(Collectors.toSet());
+ transaction.cancel(Sets.difference(originals, needsRelocation));
+
+ Map<Integer, List<SSTableReader>> groupedByDisk = needsRelocation.stream().collect(Collectors.groupingBy((s) ->
+ CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), s)));
+
+ int maxSize = 0;
+ for (List<SSTableReader> diskSSTables : groupedByDisk.values())
+ maxSize = Math.max(maxSize, diskSSTables.size());
+
+ List<SSTableReader> mixedSSTables = new ArrayList<>();
+
+ for (int i = 0; i < maxSize; i++)
+ for (List<SSTableReader> diskSSTables : groupedByDisk.values())
+ if (i < diskSSTables.size())
+ mixedSSTables.add(diskSSTables.get(i));
+
+ return mixedSSTables;
+ }
+
+ private boolean inCorrectLocation(SSTableReader sstable)
+ {
+ if (!cfs.getPartitioner().splitter().isPresent())
+ return true;
+ int directoryIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), sstable);
+ Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
+
+ Directories.DataDirectory location = locations[directoryIndex];
+ PartitionPosition diskLast = diskBoundaries.get(directoryIndex);
+ // the location we get from directoryIndex is based on the first key in the sstable
+ // now we need to make sure the last key is less than the boundary as well:
+ return sstable.descriptor.directory.getAbsolutePath().startsWith(location.location.getAbsolutePath()) && sstable.last.compareTo(diskLast) <= 0;
+ }
+
+ @Override
+ public void execute(LifecycleTransaction txn) throws IOException
+ {
+ logger.debug("Relocating {}", txn.originals());
+ AbstractCompactionTask task = cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, Long.MAX_VALUE);
+ task.setUserDefined(true);
+ task.setCompactionType(OperationType.RELOCATE);
+ task.execute(metrics);
+ }
+ }, OperationType.RELOCATE);
+ }
+
public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs,
final Collection<Range<Token>> ranges,
final Refs<SSTableReader> sstables,
@@ -878,9 +950,7 @@ public class CompactionManager implements CompactionManagerMBean
logger.info("Cleaning up {}", sstable);
- File compactionFileLocation = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(txn.originals(), OperationType.CLEANUP));
- if (compactionFileLocation == null)
- throw new IOException("disk full");
+ File compactionFileLocation = sstable.descriptor.directory;
List<SSTableReader> finished;
int nowInSec = FBUtilities.nowInSeconds();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 7c7e86a..067a0c1 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -20,9 +20,12 @@ package org.apache.cassandra.db.compaction;
import java.util.*;
import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
import com.google.common.collect.Iterables;
import org.apache.cassandra.index.Index;
+import com.google.common.primitives.Ints;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,6 +34,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.dht.Range;
@@ -43,19 +47,21 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.notifications.*;
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.StorageService;
/**
* Manages the compaction strategies.
*
- * Currently has two instances of actual compaction strategies - one for repaired data and one for
+ * Currently has two instances of actual compaction strategies per data directory - one for repaired data and one for
* unrepaired data. This is done to be able to totally separate the different sets of sstables.
*/
+
public class CompactionStrategyManager implements INotificationConsumer
{
private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
private final ColumnFamilyStore cfs;
- private volatile AbstractCompactionStrategy repaired;
- private volatile AbstractCompactionStrategy unrepaired;
+ private volatile List<AbstractCompactionStrategy> repaired = new ArrayList<>();
+ private volatile List<AbstractCompactionStrategy> unrepaired = new ArrayList<>();
private volatile boolean enabled = true;
public boolean isActive = true;
private volatile CompactionParams params;
@@ -67,6 +73,7 @@ public class CompactionStrategyManager implements INotificationConsumer
we will use the new compaction parameters.
*/
private CompactionParams schemaCompactionParams;
+ private Directories.DataDirectory[] locations;
public CompactionStrategyManager(ColumnFamilyStore cfs)
{
@@ -75,6 +82,7 @@ public class CompactionStrategyManager implements INotificationConsumer
this.cfs = cfs;
reload(cfs.metadata);
params = cfs.metadata.params.compaction;
+ locations = getDirectories().getWriteableLocations();
enabled = params.isEnabled();
}
@@ -91,20 +99,17 @@ public class CompactionStrategyManager implements INotificationConsumer
maybeReload(cfs.metadata);
- if (repaired.getEstimatedRemainingTasks() > unrepaired.getEstimatedRemainingTasks())
- {
- AbstractCompactionTask repairedTask = repaired.getNextBackgroundTask(gcBefore);
- if (repairedTask != null)
- return repairedTask;
- return unrepaired.getNextBackgroundTask(gcBefore);
- }
- else
+ List<AbstractCompactionStrategy> strategies = new ArrayList<>(repaired.size() + unrepaired.size());
+ strategies.addAll(repaired);
+ strategies.addAll(unrepaired);
+ Collections.sort(strategies, (o1, o2) -> Ints.compare(o2.getEstimatedRemainingTasks(), o1.getEstimatedRemainingTasks()));
+ for (AbstractCompactionStrategy strategy : strategies)
{
- AbstractCompactionTask unrepairedTask = unrepaired.getNextBackgroundTask(gcBefore);
- if (unrepairedTask != null)
- return unrepairedTask;
- return repaired.getNextBackgroundTask(gcBefore);
+ AbstractCompactionTask task = strategy.getNextBackgroundTask(gcBefore);
+ if (task != null)
+ return task;
}
+ return null;
}
public boolean isEnabled()
@@ -135,36 +140,78 @@ public class CompactionStrategyManager implements INotificationConsumer
if (sstable.openReason != SSTableReader.OpenReason.EARLY)
getCompactionStrategyFor(sstable).addSSTable(sstable);
}
- repaired.startup();
- unrepaired.startup();
+ repaired.forEach(AbstractCompactionStrategy::startup);
+ unrepaired.forEach(AbstractCompactionStrategy::startup);
}
/**
* return the compaction strategy for the given sstable
*
- * returns differently based on the repaired status
+ * returns differently based on the repaired status and which vnode the compaction strategy belongs to
* @param sstable
* @return
*/
private AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
{
+ int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
if (sstable.isRepaired())
- return repaired;
+ return repaired.get(index);
else
- return unrepaired;
+ return unrepaired.get(index);
+ }
+
+ /**
+ * Get the correct compaction strategy for the given sstable. If the first token starts within a disk boundary, we
+ * will add it to that compaction strategy.
+ *
+ * In the case we are upgrading, the first compaction strategy will get most files - we do not care about which disk
+ * the sstable is on currently (unless we don't know the local tokens yet). Once we start compacting we will write out
+ * sstables in the correct locations and give them to the correct compaction strategy instance.
+ *
+ * @param cfs
+ * @param locations
+ * @param sstable
+ * @return
+ */
+ public static int getCompactionStrategyIndex(ColumnFamilyStore cfs, Directories locations, SSTableReader sstable)
+ {
+ if (!cfs.getPartitioner().splitter().isPresent())
+ return 0;
+
+ Directories.DataDirectory[] directories = locations.getWriteableLocations();
+
+ List<PartitionPosition> boundaries = StorageService.getDiskBoundaries(cfs, locations.getWriteableLocations());
+ if (boundaries == null)
+ {
+ // try to figure out location based on sstable directory:
+ for (int i = 0; i < directories.length; i++)
+ {
+ Directories.DataDirectory directory = directories[i];
+ if (sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
+ return i;
+ }
+ return 0;
+ }
+
+ int pos = Collections.binarySearch(boundaries, sstable.first);
+ assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal
+ return -pos - 1;
+
+
}
public void shutdown()
{
isActive = false;
- repaired.shutdown();
- unrepaired.shutdown();
+ repaired.forEach(AbstractCompactionStrategy::shutdown);
+ unrepaired.forEach(AbstractCompactionStrategy::shutdown);
}
public synchronized void maybeReload(CFMetaData metadata)
{
// compare the old schema configuration to the new one, ignore any locally set changes.
- if (metadata.params.compaction.equals(schemaCompactionParams))
+ if (metadata.params.compaction.equals(schemaCompactionParams) &&
+ Arrays.equals(locations, cfs.getDirectories().getWriteableLocations())) // any drives broken?
return;
reload(metadata);
}
@@ -178,6 +225,11 @@ public class CompactionStrategyManager implements INotificationConsumer
public synchronized void reload(CFMetaData metadata)
{
boolean disabledWithJMX = !enabled && shouldBeEnabled();
+ if (!metadata.params.compaction.equals(schemaCompactionParams))
+ logger.trace("Recreating compaction strategy - compaction parameters changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
+ else if (!Arrays.equals(locations, cfs.getDirectories().getWriteableLocations()))
+ logger.trace("Recreating compaction strategy - writeable locations changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
+
setStrategy(metadata.params.compaction);
schemaCompactionParams = metadata.params.compaction;
@@ -197,11 +249,13 @@ public class CompactionStrategyManager implements INotificationConsumer
public int getUnleveledSSTables()
{
- if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy)
+ if (repaired.get(0) instanceof LeveledCompactionStrategy && unrepaired.get(0) instanceof LeveledCompactionStrategy)
{
int count = 0;
- count += ((LeveledCompactionStrategy)repaired).getLevelSize(0);
- count += ((LeveledCompactionStrategy)unrepaired).getLevelSize(0);
+ for (AbstractCompactionStrategy strategy : repaired)
+ count += ((LeveledCompactionStrategy)strategy).getLevelSize(0);
+ for (AbstractCompactionStrategy strategy : unrepaired)
+ count += ((LeveledCompactionStrategy)strategy).getLevelSize(0);
return count;
}
return 0;
@@ -209,13 +263,19 @@ public class CompactionStrategyManager implements INotificationConsumer
public synchronized int[] getSSTableCountPerLevel()
{
- if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy)
+ if (repaired.get(0) instanceof LeveledCompactionStrategy && unrepaired.get(0) instanceof LeveledCompactionStrategy)
{
int [] res = new int[LeveledManifest.MAX_LEVEL_COUNT];
- int[] repairedCountPerLevel = ((LeveledCompactionStrategy) repaired).getAllLevelSize();
- res = sumArrays(res, repairedCountPerLevel);
- int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) unrepaired).getAllLevelSize();
- res = sumArrays(res, unrepairedCountPerLevel);
+ for (AbstractCompactionStrategy strategy : repaired)
+ {
+ int[] repairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize();
+ res = sumArrays(res, repairedCountPerLevel);
+ }
+ for (AbstractCompactionStrategy strategy : unrepaired)
+ {
+ int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize();
+ res = sumArrays(res, unrepairedCountPerLevel);
+ }
return res;
}
return null;
@@ -238,103 +298,112 @@ public class CompactionStrategyManager implements INotificationConsumer
public boolean shouldDefragment()
{
- assert repaired.getClass().equals(unrepaired.getClass());
- return repaired.shouldDefragment();
+ assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass());
+ return repaired.get(0).shouldDefragment();
}
public Directories getDirectories()
{
- assert repaired.getClass().equals(unrepaired.getClass());
- return repaired.getDirectories();
+ assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass());
+ return repaired.get(0).getDirectories();
}
public synchronized void handleNotification(INotification notification, Object sender)
{
+ maybeReload(cfs.metadata);
if (notification instanceof SSTableAddedNotification)
{
SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
for (SSTableReader sstable : flushedNotification.added)
- {
- if (sstable.isRepaired())
- repaired.addSSTable(sstable);
- else
- unrepaired.addSSTable(sstable);
- }
+ getCompactionStrategyFor(sstable).addSSTable(sstable);
}
else if (notification instanceof SSTableListChangedNotification)
{
+ // a bit of gymnastics to be able to replace sstables in compaction strategies
+ // we use this to know that a compaction finished and where to start the next compaction in LCS
SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
- Set<SSTableReader> repairedRemoved = new HashSet<>();
- Set<SSTableReader> repairedAdded = new HashSet<>();
- Set<SSTableReader> unrepairedRemoved = new HashSet<>();
- Set<SSTableReader> unrepairedAdded = new HashSet<>();
+
+ Directories.DataDirectory [] locations = cfs.getDirectories().getWriteableLocations();
+ int locationSize = cfs.getPartitioner().splitter().isPresent() ? locations.length : 1;
+
+ List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize);
+ List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
+ List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize);
+ List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize);
+
+ for (int i = 0; i < locationSize; i++)
+ {
+ repairedRemoved.add(new HashSet<>());
+ repairedAdded.add(new HashSet<>());
+ unrepairedRemoved.add(new HashSet<>());
+ unrepairedAdded.add(new HashSet<>());
+ }
for (SSTableReader sstable : listChangedNotification.removed)
{
+ int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
if (sstable.isRepaired())
- repairedRemoved.add(sstable);
+ repairedRemoved.get(i).add(sstable);
else
- unrepairedRemoved.add(sstable);
+ unrepairedRemoved.get(i).add(sstable);
}
for (SSTableReader sstable : listChangedNotification.added)
{
+ int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
if (sstable.isRepaired())
- repairedAdded.add(sstable);
+ repairedAdded.get(i).add(sstable);
else
- unrepairedAdded.add(sstable);
- }
- if (!repairedRemoved.isEmpty())
- {
- repaired.replaceSSTables(repairedRemoved, repairedAdded);
- }
- else
- {
- for (SSTableReader sstable : repairedAdded)
- repaired.addSSTable(sstable);
+ unrepairedAdded.get(i).add(sstable);
}
- if (!unrepairedRemoved.isEmpty())
+ for (int i = 0; i < locationSize; i++)
{
- unrepaired.replaceSSTables(unrepairedRemoved, unrepairedAdded);
- }
- else
- {
- for (SSTableReader sstable : unrepairedAdded)
- unrepaired.addSSTable(sstable);
+ if (!repairedRemoved.get(i).isEmpty())
+ repaired.get(i).replaceSSTables(repairedRemoved.get(i), repairedAdded.get(i));
+ else
+ {
+ for (SSTableReader sstable : repairedAdded.get(i))
+ repaired.get(i).addSSTable(sstable);
+ }
+ if (!unrepairedRemoved.get(i).isEmpty())
+ unrepaired.get(i).replaceSSTables(unrepairedRemoved.get(i), unrepairedAdded.get(i));
+ else
+ {
+ for (SSTableReader sstable : unrepairedAdded.get(i))
+ unrepaired.get(i).addSSTable(sstable);
+ }
}
}
else if (notification instanceof SSTableRepairStatusChanged)
{
for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable)
{
+ int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
if (sstable.isRepaired())
{
- unrepaired.removeSSTable(sstable);
- repaired.addSSTable(sstable);
+ unrepaired.get(index).removeSSTable(sstable);
+ repaired.get(index).addSSTable(sstable);
}
else
{
- repaired.removeSSTable(sstable);
- unrepaired.addSSTable(sstable);
+ repaired.get(index).removeSSTable(sstable);
+ unrepaired.get(index).addSSTable(sstable);
}
}
}
else if (notification instanceof SSTableDeletingNotification)
{
- SSTableReader sstable = ((SSTableDeletingNotification)notification).deleting;
- if (sstable.isRepaired())
- repaired.removeSSTable(sstable);
- else
- unrepaired.removeSSTable(sstable);
+ SSTableReader sstable = ((SSTableDeletingNotification) notification).deleting;
+ getCompactionStrategyFor(sstable).removeSSTable(sstable);
}
}
public void enable()
{
if (repaired != null)
- repaired.enable();
+ repaired.forEach(AbstractCompactionStrategy::enable);
if (unrepaired != null)
- unrepaired.enable();
+ unrepaired.forEach(AbstractCompactionStrategy::enable);
// enable this last to make sure the strategies are ready to get calls.
enabled = true;
}
@@ -344,47 +413,64 @@ public class CompactionStrategyManager implements INotificationConsumer
// disable this first avoid asking disabled strategies for compaction tasks
enabled = false;
if (repaired != null)
- repaired.disable();
+ repaired.forEach(AbstractCompactionStrategy::disable);
if (unrepaired != null)
- unrepaired.disable();
+ unrepaired.forEach(AbstractCompactionStrategy::disable);
}
/**
- * Create ISSTableScanner from the given sstables
+ * Create ISSTableScanners from the given sstables
*
* Delegates the call to the compaction strategies to allow LCS to create a scanner
* @param sstables
- * @param range
+ * @param ranges
* @return
*/
@SuppressWarnings("resource")
public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges)
{
- List<SSTableReader> repairedSSTables = new ArrayList<>();
- List<SSTableReader> unrepairedSSTables = new ArrayList<>();
+ assert repaired.size() == unrepaired.size();
+ List<Set<SSTableReader>> repairedSSTables = new ArrayList<>();
+ List<Set<SSTableReader>> unrepairedSSTables = new ArrayList<>();
+
+ for (int i = 0; i < repaired.size(); i++)
+ {
+ repairedSSTables.add(new HashSet<>());
+ unrepairedSSTables.add(new HashSet<>());
+ }
+
for (SSTableReader sstable : sstables)
{
if (sstable.isRepaired())
- repairedSSTables.add(sstable);
+ repairedSSTables.get(getCompactionStrategyIndex(cfs, getDirectories(), sstable)).add(sstable);
else
- unrepairedSSTables.add(sstable);
+ unrepairedSSTables.get(getCompactionStrategyIndex(cfs, getDirectories(), sstable)).add(sstable);
}
- Set<ISSTableScanner> scanners = new HashSet<>(sstables.size());
+ List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
for (Range<Token> range : ranges)
{
- AbstractCompactionStrategy.ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range);
- AbstractCompactionStrategy.ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range);
+ List<ISSTableScanner> repairedScanners = new ArrayList<>();
+ List<ISSTableScanner> unrepairedScanners = new ArrayList<>();
- for (ISSTableScanner scanner : Iterables.concat(repairedScanners.scanners, unrepairedScanners.scanners))
+ for (int i = 0; i < repairedSSTables.size(); i++)
+ {
+ if (!repairedSSTables.get(i).isEmpty())
+ repairedScanners.addAll(repaired.get(i).getScanners(repairedSSTables.get(i), range).scanners);
+ }
+ for (int i = 0; i < unrepairedSSTables.size(); i++)
+ {
+ if (!unrepairedSSTables.get(i).isEmpty())
+ scanners.addAll(unrepaired.get(i).getScanners(unrepairedSSTables.get(i), range).scanners);
+ }
+ for (ISSTableScanner scanner : Iterables.concat(repairedScanners, unrepairedScanners))
{
if (!scanners.add(scanner))
scanner.close();
}
}
-
- return new AbstractCompactionStrategy.ScannerList(new ArrayList<>(scanners));
+ return new AbstractCompactionStrategy.ScannerList(scanners);
}
public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables)
@@ -394,21 +480,44 @@ public class CompactionStrategyManager implements INotificationConsumer
public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
{
- return unrepaired.groupSSTablesForAntiCompaction(sstablesToGroup);
+ Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+ Collection<Collection<SSTableReader>> anticompactionGroups = new ArrayList<>();
+
+ for (Map.Entry<Integer, List<SSTableReader>> group : groups.entrySet())
+ anticompactionGroups.addAll(unrepaired.get(group.getKey()).groupSSTablesForAntiCompaction(group.getValue()));
+ return anticompactionGroups;
}
public long getMaxSSTableBytes()
{
- return unrepaired.getMaxSSTableBytes();
+ return unrepaired.get(0).getMaxSSTableBytes();
}
public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
{
+ maybeReload(cfs.metadata);
+ validateForCompaction(txn.originals());
return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
}
+ private void validateForCompaction(Iterable<SSTableReader> input)
+ {
+ SSTableReader firstSSTable = Iterables.getFirst(input, null);
+ assert firstSSTable != null;
+ boolean repaired = firstSSTable.isRepaired();
+ int firstIndex = getCompactionStrategyIndex(cfs, getDirectories(), firstSSTable);
+ for (SSTableReader sstable : input)
+ {
+ if (sstable.isRepaired() != repaired)
+ throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
+ if (firstIndex != getCompactionStrategyIndex(cfs, getDirectories(), sstable))
+ throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
+ }
+ }
+
public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore, final boolean splitOutput)
{
+ maybeReload(cfs.metadata);
// runWithCompactionsDisabled cancels active compactions and disables them, then we are able
// to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the
// sstables are marked the compactions are re-enabled
@@ -419,20 +528,21 @@ public class CompactionStrategyManager implements INotificationConsumer
{
synchronized (CompactionStrategyManager.this)
{
- Collection<AbstractCompactionTask> repairedTasks = repaired.getMaximalTask(gcBefore, splitOutput);
- Collection<AbstractCompactionTask> unrepairedTasks = unrepaired.getMaximalTask(gcBefore, splitOutput);
-
- if (repairedTasks == null && unrepairedTasks == null)
- return null;
-
- if (repairedTasks == null)
- return unrepairedTasks;
- if (unrepairedTasks == null)
- return repairedTasks;
-
List<AbstractCompactionTask> tasks = new ArrayList<>();
- tasks.addAll(repairedTasks);
- tasks.addAll(unrepairedTasks);
+ for (AbstractCompactionStrategy strategy : repaired)
+ {
+ Collection<AbstractCompactionTask> task = strategy.getMaximalTask(gcBefore, splitOutput);
+ if (task != null)
+ tasks.addAll(task);
+ }
+ for (AbstractCompactionStrategy strategy : unrepaired)
+ {
+ Collection<AbstractCompactionTask> task = strategy.getMaximalTask(gcBefore, splitOutput);
+ if (task != null)
+ tasks.addAll(task);
+ }
+ if (tasks.isEmpty())
+ return null;
return tasks;
}
}
@@ -441,14 +551,18 @@ public class CompactionStrategyManager implements INotificationConsumer
public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
{
+ maybeReload(cfs.metadata);
+ validateForCompaction(sstables);
return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore);
}
public int getEstimatedRemainingTasks()
{
int tasks = 0;
- tasks += repaired.getEstimatedRemainingTasks();
- tasks += unrepaired.getEstimatedRemainingTasks();
+ for (AbstractCompactionStrategy strategy : repaired)
+ tasks += strategy.getEstimatedRemainingTasks();
+ for (AbstractCompactionStrategy strategy : unrepaired)
+ tasks += strategy.getEstimatedRemainingTasks();
return tasks;
}
@@ -460,10 +574,10 @@ public class CompactionStrategyManager implements INotificationConsumer
public String getName()
{
- return unrepaired.getName();
+ return unrepaired.get(0).getName();
}
- public List<AbstractCompactionStrategy> getStrategies()
+ public List<List<AbstractCompactionStrategy>> getStrategies()
{
return Arrays.asList(repaired, unrepaired);
}
@@ -481,12 +595,25 @@ public class CompactionStrategyManager implements INotificationConsumer
private void setStrategy(CompactionParams params)
{
- if (repaired != null)
- repaired.shutdown();
- if (unrepaired != null)
- unrepaired.shutdown();
- repaired = CFMetaData.createCompactionStrategyInstance(cfs, params);
- unrepaired = CFMetaData.createCompactionStrategyInstance(cfs, params);
+ repaired.forEach(AbstractCompactionStrategy::shutdown);
+ unrepaired.forEach(AbstractCompactionStrategy::shutdown);
+ repaired.clear();
+ unrepaired.clear();
+
+ if (cfs.getPartitioner().splitter().isPresent())
+ {
+ locations = cfs.getDirectories().getWriteableLocations();
+ for (int i = 0; i < locations.length; i++)
+ {
+ repaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
+ unrepaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
+ }
+ }
+ else
+ {
+ repaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
+ unrepaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
+ }
this.params = params;
}
@@ -510,11 +637,11 @@ public class CompactionStrategyManager implements INotificationConsumer
{
if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
{
- return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
+ return unrepaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
}
else
{
- return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
+ return repaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index be81c80..6b9fe21 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -230,7 +230,7 @@ public class CompactionTask extends AbstractCompactionTask
LifecycleTransaction transaction,
Set<SSTableReader> nonExpiredSSTables)
{
- return new DefaultCompactionWriter(cfs, directories, transaction, nonExpiredSSTables, offline, keepOriginals);
+ return new DefaultCompactionWriter(cfs, directories, transaction, nonExpiredSSTables, offline, keepOriginals, getLevel());
}
public static String updateCompactionHistory(String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 50f9b71..2dc6ee8 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -85,7 +85,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
*/
private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
{
- if (Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE)))
+ if (sstables.isEmpty())
return Collections.emptyList();
Set<SSTableReader> uncompacting = ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(), sstables::contains));
@@ -212,6 +212,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
{
sstables.remove(sstable);
}
+
/**
* A target time span used for bucketing SSTables based on timestamps.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index b7bf83f..c54b751 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -355,7 +355,7 @@ public class LeveledManifest
Collection<SSTableReader> candidates = getCandidatesFor(0);
if (candidates.isEmpty())
return null;
- return new CompactionCandidate(candidates, getNextLevel(candidates), cfs.getCompactionStrategyManager().getMaxSSTableBytes());
+ return new CompactionCandidate(candidates, getNextLevel(candidates), maxSSTableSizeInBytes);
}
private List<SSTableReader> getSSTablesForSTCS(Collection<SSTableReader> sstables)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index 20e6df2..84a34c9 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -37,7 +37,8 @@ public enum OperationType
STREAM("Stream"),
WRITE("Write"),
VIEW_BUILD("View build"),
- INDEX_SUMMARY("Index summary redistribution");
+ INDEX_SUMMARY("Index summary redistribution"),
+ RELOCATE("Relocate sstables to correct disk");
public final String type;
public final String fileName;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 272c2f8..838b0a1 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -104,11 +104,8 @@ public class Scrubber implements Closeable
List<SSTableReader> toScrub = Collections.singletonList(sstable);
- // Calculate the expected compacted filesize
- this.destination = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(toScrub, OperationType.SCRUB));
- if (destination == null)
- throw new IOException("disk full");
-
+ int locIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), sstable);
+ this.destination = cfs.getDirectories().getLocationForDisk(cfs.getDirectories().getWriteableLocations()[locIndex]);
this.isCommutative = cfs.metadata.isCounter();
boolean hasIndexFile = (new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX))).exists();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index f8a8240..e36adf2 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -21,6 +21,7 @@ import java.util.*;
import java.util.Map.Entry;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,7 +85,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(candidates), sizeTieredOptions.bucketHigh, sizeTieredOptions.bucketLow, sizeTieredOptions.minSSTableSize);
logger.trace("Compaction buckets are {}", buckets);
- updateEstimatedCompactionsByTasks(buckets);
+ estimatedRemainingTasks = getEstimatedCompactionsByTasks(cfs, buckets);
List<SSTableReader> mostInteresting = mostInterestingBucket(buckets, minThreshold, maxThreshold);
if (!mostInteresting.isEmpty())
return mostInteresting;
@@ -282,15 +283,15 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
return new ArrayList<List<T>>(buckets.values());
}
- private void updateEstimatedCompactionsByTasks(List<List<SSTableReader>> tasks)
+ public static int getEstimatedCompactionsByTasks(ColumnFamilyStore cfs, List<List<SSTableReader>> tasks)
{
int n = 0;
- for (List<SSTableReader> bucket: tasks)
+ for (List<SSTableReader> bucket : tasks)
{
if (bucket.size() >= cfs.getMinimumCompactionThreshold())
n += Math.ceil((double)bucket.size() / cfs.getMaximumCompactionThreshold());
}
- estimatedRemainingTasks = n;
+ return n;
}
public long getMaxSSTableBytes()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index 0b3b7d0..46023ce 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -18,18 +18,26 @@
package org.apache.cassandra.db.compaction.writers;
+import java.io.File;
import java.util.Collection;
+import java.util.List;
import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.compaction.CompactionTask;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.sstable.SSTableRewriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.concurrent.Transactional;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.service.StorageService;
/**
@@ -38,6 +46,8 @@ import org.apache.cassandra.utils.concurrent.Transactional;
*/
public abstract class CompactionAwareWriter extends Transactional.AbstractTransactional implements Transactional
{
+ protected static final Logger logger = LoggerFactory.getLogger(DefaultCompactionWriter.class);
+
protected final ColumnFamilyStore cfs;
protected final Directories directories;
protected final Set<SSTableReader> nonExpiredSSTables;
@@ -45,9 +55,11 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
protected final long maxAge;
protected final long minRepairedAt;
- protected final LifecycleTransaction txn;
protected final SSTableRewriter sstableWriter;
- private boolean isInitialized = false;
+ protected final LifecycleTransaction txn;
+ private final Directories.DataDirectory[] locations;
+ private final List<PartitionPosition> diskBoundaries;
+ private int locationIndex;
public CompactionAwareWriter(ColumnFamilyStore cfs,
Directories directories,
@@ -59,12 +71,15 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
this.cfs = cfs;
this.directories = directories;
this.nonExpiredSSTables = nonExpiredSSTables;
- this.estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
- this.maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
- this.minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
this.txn = txn;
- this.sstableWriter = SSTableRewriter.constructKeepingOriginals(txn, keepOriginals, maxAge, offline);
+ estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
+ maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
+ sstableWriter = SSTableRewriter.constructKeepingOriginals(txn, keepOriginals, maxAge, offline);
+ minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
+ locations = cfs.getDirectories().getWriteableLocations();
+ diskBoundaries = StorageService.getDiskBoundaries(cfs);
+ locationIndex = -1;
}
@Override
@@ -96,6 +111,8 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
return sstableWriter.finished();
}
+ public abstract List<SSTableReader> finish(long repairedAt);
+
/**
* estimated number of keys we should write
*/
@@ -104,6 +121,11 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
return estimatedTotalKeys;
}
+ /**
+ * Writes a partition in an implementation specific way
+ * @param partition the partition to append
+ * @return true if the partition was written, false otherwise
+ */
public final boolean append(UnfilteredRowIterator partition)
{
maybeSwitchWriter(partition.partitionKey());
@@ -125,9 +147,26 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
*/
protected void maybeSwitchWriter(DecoratedKey key)
{
- if (!isInitialized)
- switchCompactionLocation(getDirectories().getWriteableLocation(cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType())));
- isInitialized = true;
+ if (diskBoundaries == null)
+ {
+ if (locationIndex < 0)
+ {
+ Directories.DataDirectory defaultLocation = getWriteDirectory(nonExpiredSSTables, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, OperationType.UNKNOWN));
+ switchCompactionLocation(defaultLocation);
+ locationIndex = 0;
+ }
+ return;
+ }
+
+ if (locationIndex > -1 && key.compareTo(diskBoundaries.get(locationIndex)) < 0)
+ return;
+
+ int prevIdx = locationIndex;
+ while (locationIndex == -1 || key.compareTo(diskBoundaries.get(locationIndex)) > 0)
+ locationIndex++;
+ if (prevIdx >= 0)
+ logger.debug("Switching write location from {} to {}", locations[prevIdx], locations[locationIndex]);
+ switchCompactionLocation(locations[locationIndex]);
}
/**
@@ -148,13 +187,37 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
/**
* Return a directory where we can expect expectedWriteSize to fit.
+ *
+ * @param sstables the sstables to compact
+ * @return
*/
- public Directories.DataDirectory getWriteDirectory(long expectedWriteSize)
+ public Directories.DataDirectory getWriteDirectory(Iterable<SSTableReader> sstables, long estimatedWriteSize)
{
- Directories.DataDirectory directory = getDirectories().getWriteableLocation(expectedWriteSize);
- if (directory == null)
- throw new RuntimeException("Insufficient disk space to write " + expectedWriteSize + " bytes");
+ File directory = null;
+ for (SSTableReader sstable : sstables)
+ {
+ if (directory == null)
+ directory = sstable.descriptor.directory;
+ if (!directory.equals(sstable.descriptor.directory))
+ logger.trace("All sstables not from the same disk - putting results in {}", directory);
+ }
+ Directories.DataDirectory d = getDirectories().getDataDirectoryForFile(directory);
+ if (d != null)
+ {
+ if (d.getAvailableSpace() < estimatedWriteSize)
+ throw new RuntimeException(String.format("Not enough space to write %d bytes to %s (%d bytes available)", estimatedWriteSize, d.location, d.getAvailableSpace()));
+ logger.trace("putting compaction results in {}", directory);
+ return d;
+ }
+ d = getDirectories().getWriteableLocation(estimatedWriteSize);
+ if (d == null)
+ throw new RuntimeException("Not enough disk space to store "+estimatedWriteSize+" bytes");
+ return d;
+ }
- return directory;
+ public CompactionAwareWriter setRepairedAt(long repairedAt)
+ {
+ this.sstableWriter.setRepairedAt(repairedAt);
+ return this;
}
}