You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/01/05 16:55:46 UTC
[1/2] cassandra git commit: Make sure the same token does not exist
in several data directories
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.2 156829340 -> e2c634189
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
index 8b4351f..5e78834 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.db.compaction.writers;
+import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
@@ -39,16 +40,18 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
public class DefaultCompactionWriter extends CompactionAwareWriter
{
protected static final Logger logger = LoggerFactory.getLogger(DefaultCompactionWriter.class);
+ private final int sstableLevel;
public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
{
- this(cfs, directories, txn, nonExpiredSSTables, false, false);
+ this(cfs, directories, txn, nonExpiredSSTables, false, false, 0);
}
@SuppressWarnings("resource")
- public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, boolean keepOriginals)
+ public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, boolean keepOriginals, int sstableLevel)
{
super(cfs, directories, txn, nonExpiredSSTables, offline, keepOriginals);
+ this.sstableLevel = sstableLevel;
}
@Override
@@ -58,14 +61,14 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
}
@Override
- protected void switchCompactionLocation(Directories.DataDirectory directory)
+ public void switchCompactionLocation(Directories.DataDirectory directory)
{
@SuppressWarnings("resource")
SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(directory))),
estimatedTotalKeys,
minRepairedAt,
cfs.metadata,
- new MetadataCollector(txn.originals(), cfs.metadata.comparator, 0),
+ new MetadataCollector(txn.originals(), cfs.metadata.comparator, sstableLevel),
SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
cfs.indexManager.listIndexes(),
txn);
@@ -73,6 +76,12 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
}
@Override
+ public List<SSTableReader> finish(long repairedAt)
+ {
+ return sstableWriter.setRepairedAt(repairedAt).finish();
+ }
+
+ @Override
public long estimatedKeys()
{
return estimatedTotalKeys;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index b0c4562..0c88ac6 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -17,12 +17,9 @@
*/
package org.apache.cassandra.db.compaction.writers;
-import java.io.File;
+import java.util.List;
import java.util.Set;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.RowIndexEntry;
@@ -37,15 +34,14 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
public class MajorLeveledCompactionWriter extends CompactionAwareWriter
{
- private static final Logger logger = LoggerFactory.getLogger(MajorLeveledCompactionWriter.class);
private final long maxSSTableSize;
- private final long expectedWriteSize;
- private final Set<SSTableReader> allSSTables;
private int currentLevel = 1;
private long averageEstimatedKeysPerSSTable;
private long partitionsWritten = 0;
private long totalWrittenInLevel = 0;
private int sstablesWritten = 0;
+ private final long keysPerSSTable;
+ private Directories.DataDirectory sstableDirectory;
public MajorLeveledCompactionWriter(ColumnFamilyStore cfs,
Directories directories,
@@ -67,8 +63,8 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
{
super(cfs, directories, txn, nonExpiredSSTables, offline, keepOriginals);
this.maxSSTableSize = maxSSTableSize;
- this.allSSTables = txn.originals();
- expectedWriteSize = Math.min(maxSSTableSize, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType()));
+ long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(nonExpiredSSTables) / maxSSTableSize);
+ keysPerSSTable = estimatedTotalKeys / estimatedSSTables;
}
@Override
@@ -86,28 +82,33 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
totalWrittenInLevel = 0;
currentLevel++;
}
-
- averageEstimatedKeysPerSSTable = Math.round(((double) averageEstimatedKeysPerSSTable * sstablesWritten + partitionsWritten) / (sstablesWritten + 1));
- switchCompactionLocation(getWriteDirectory(expectedWriteSize));
- partitionsWritten = 0;
- sstablesWritten++;
+ switchCompactionLocation(sstableDirectory);
}
return rie != null;
}
- public void switchCompactionLocation(Directories.DataDirectory directory)
+ @Override
+ public void switchCompactionLocation(Directories.DataDirectory location)
+ {
+ this.sstableDirectory = location;
+ averageEstimatedKeysPerSSTable = Math.round(((double) averageEstimatedKeysPerSSTable * sstablesWritten + partitionsWritten) / (sstablesWritten + 1));
+ sstableWriter.switchWriter(SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(sstableDirectory))),
+ keysPerSSTable,
+ minRepairedAt,
+ cfs.metadata,
+ new MetadataCollector(txn.originals(), cfs.metadata.comparator, currentLevel),
+ SerializationHeader.make(cfs.metadata, txn.originals()),
+ cfs.indexManager.listIndexes(),
+ txn));
+ partitionsWritten = 0;
+ sstablesWritten = 0;
+
+ }
+
+ @Override
+ public List<SSTableReader> finish(long repairedAt)
{
- File sstableDirectory = getDirectories().getLocationForDisk(directory);
- @SuppressWarnings("resource")
- SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
- averageEstimatedKeysPerSSTable,
- minRepairedAt,
- cfs.metadata,
- new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel),
- SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
- cfs.indexManager.listIndexes(),
- txn);
- sstableWriter.switchWriter(writer);
+ return sstableWriter.setRepairedAt(repairedAt).finish();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index 1dc72e7..ac83cc6 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -17,6 +17,8 @@
*/
package org.apache.cassandra.db.compaction.writers;
+import java.io.File;
+import java.util.List;
import java.util.Set;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -33,11 +35,11 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
public class MaxSSTableSizeWriter extends CompactionAwareWriter
{
private final long estimatedTotalKeys;
- private final long expectedWriteSize;
private final long maxSSTableSize;
private final int level;
private final long estimatedSSTables;
private final Set<SSTableReader> allSSTables;
+ private Directories.DataDirectory sstableDirectory;
public MaxSSTableSizeWriter(ColumnFamilyStore cfs,
Directories directories,
@@ -63,25 +65,25 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
this.allSSTables = txn.originals();
this.level = level;
this.maxSSTableSize = maxSSTableSize;
- long totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType());
- expectedWriteSize = Math.min(maxSSTableSize, totalSize);
estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
estimatedSSTables = Math.max(1, estimatedTotalKeys / maxSSTableSize);
}
- @Override
- public boolean realAppend(UnfilteredRowIterator partition)
+ protected boolean realAppend(UnfilteredRowIterator partition)
{
RowIndexEntry rie = sstableWriter.append(partition);
if (sstableWriter.currentWriter().getOnDiskFilePointer() > maxSSTableSize)
- switchCompactionLocation(getWriteDirectory(expectedWriteSize));
+ {
+ switchCompactionLocation(sstableDirectory);
+ }
return rie != null;
}
+ @Override
public void switchCompactionLocation(Directories.DataDirectory location)
{
- @SuppressWarnings("resource")
- SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(location))),
+ sstableDirectory = location;
+ SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(sstableDirectory))),
estimatedTotalKeys / estimatedSSTables,
minRepairedAt,
cfs.metadata,
@@ -91,7 +93,11 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
txn);
sstableWriter.switchWriter(writer);
+ }
+ public List<SSTableReader> finish(long repairedAt)
+ {
+ return sstableWriter.setRepairedAt(repairedAt).finish();
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index 3a7f526..46183dc 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -17,8 +17,8 @@
*/
package org.apache.cassandra.db.compaction.writers;
-import java.io.File;
import java.util.Arrays;
+import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
@@ -51,6 +51,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
private final Set<SSTableReader> allSSTables;
private long currentBytesToWrite;
private int currentRatioIndex = 0;
+ private Directories.DataDirectory location;
public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
{
@@ -82,10 +83,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
}
}
ratios = Arrays.copyOfRange(potentialRatios, 0, noPointIndex);
- long currentPartitionsToWrite = Math.round(estimatedTotalKeys * ratios[currentRatioIndex]);
currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]);
- switchCompactionLocation(getWriteDirectory(currentBytesToWrite));
- logger.trace("Ratios={}, expectedKeys = {}, totalSize = {}, currentPartitionsToWrite = {}, currentBytesToWrite = {}", ratios, estimatedTotalKeys, totalSize, currentPartitionsToWrite, currentBytesToWrite);
}
@Override
@@ -96,15 +94,17 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
{
currentRatioIndex++;
currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]);
- switchCompactionLocation(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex])));
+ switchCompactionLocation(location);
+ logger.debug("Switching writer, currentBytesToWrite = {}", currentBytesToWrite);
}
return rie != null;
}
+ @Override
public void switchCompactionLocation(Directories.DataDirectory location)
{
+ this.location = location;
long currentPartitionsToWrite = Math.round(ratios[currentRatioIndex] * estimatedTotalKeys);
- @SuppressWarnings("resource")
SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(location))),
currentPartitionsToWrite,
minRepairedAt,
@@ -115,6 +115,11 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
txn);
logger.trace("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite);
sstableWriter.switchWriter(writer);
+ }
+ @Override
+ public List<SSTableReader> finish(long repairedAt)
+ {
+ return sstableWriter.setRepairedAt(repairedAt).finish();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index c09d49c..4c73472 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -327,10 +327,10 @@ public class Tracker
apply(View.markFlushing(memtable));
}
- public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
+ public void replaceFlushed(Memtable memtable, Iterable<SSTableReader> sstables)
{
assert !isDummy();
- if (sstables == null || sstables.isEmpty())
+ if (sstables == null || Iterables.isEmpty(sstables))
{
// sstable may be null if we flushed batchlog and nothing needed to be retained
// if it's null, we don't care what state the cfstore is in, we just replace it and continue
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java
index b62c7e3..63926ed 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@ -310,7 +310,7 @@ public class View
}
// called after flush: removes memtable from flushingMemtables, and inserts flushed into the live sstable set
- static Function<View, View> replaceFlushed(final Memtable memtable, final Collection<SSTableReader> flushed)
+ static Function<View, View> replaceFlushed(final Memtable memtable, final Iterable<SSTableReader> flushed)
{
return new Function<View, View>()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/dht/IPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/IPartitioner.java b/src/java/org/apache/cassandra/dht/IPartitioner.java
index e0a08dc..b559a6f 100644
--- a/src/java/org/apache/cassandra/dht/IPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/IPartitioner.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.dht;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.marshal.AbstractType;
@@ -49,6 +50,17 @@ public interface IPartitioner
public Token getMinimumToken();
/**
+ * The biggest token for this partitioner, unlike getMinimumToken, this token is actually used and users wanting to
+ * include all tokens need to do getMaximumToken().maxKeyBound()
+ *
+ * Not implemented for the ordered partitioners
+ */
+ default Token getMaximumToken()
+ {
+ throw new UnsupportedOperationException("If you are using a splitting partitioner, getMaximumToken has to be implemented");
+ }
+
+ /**
* @return a Token that can be used to route a given key
* (This is NOT a method to create a Token from its string representation;
* for that, use TokenFactory.fromString.)
@@ -84,4 +96,9 @@ public interface IPartitioner
* Used by secondary indices.
*/
public AbstractType<?> partitionOrdering();
+
+ default Optional<Splitter> splitter()
+ {
+ return Optional.empty();
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
index d68be3f..f9f6113 100644
--- a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
+++ b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
@@ -48,6 +48,19 @@ public class Murmur3Partitioner implements IPartitioner
public static final Murmur3Partitioner instance = new Murmur3Partitioner();
public static final AbstractType<?> partitionOrdering = new PartitionerDefinedOrder(instance);
+ private final Splitter splitter = new Splitter(this)
+ {
+ public Token tokenForValue(BigInteger value)
+ {
+ return new LongToken(value.longValue());
+ }
+
+ public BigInteger valueForToken(Token token)
+ {
+ return BigInteger.valueOf(((LongToken) token).token);
+ }
+ };
+
public DecoratedKey decorateKey(ByteBuffer key)
{
long[] hash = getHash(key);
@@ -291,8 +304,18 @@ public class Murmur3Partitioner implements IPartitioner
return LongType.instance;
}
+ public Token getMaximumToken()
+ {
+ return new LongToken(Long.MAX_VALUE);
+ }
+
public AbstractType<?> partitionOrdering()
{
return partitionOrdering;
}
+
+ public Optional<Splitter> splitter()
+ {
+ return Optional.of(splitter);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/dht/RandomPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
index b0dea01..96a96ca 100644
--- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
@@ -50,6 +50,19 @@ public class RandomPartitioner implements IPartitioner
public static final RandomPartitioner instance = new RandomPartitioner();
public static final AbstractType<?> partitionOrdering = new PartitionerDefinedOrder(instance);
+ private final Splitter splitter = new Splitter(this)
+ {
+ public Token tokenForValue(BigInteger value)
+ {
+ return new BigIntegerToken(value);
+ }
+
+ public BigInteger valueForToken(Token token)
+ {
+ return ((BigIntegerToken)token).getTokenValue();
+ }
+ };
+
public DecoratedKey decorateKey(ByteBuffer key)
{
return new CachedHashDecoratedKey(getToken(key), key);
@@ -194,6 +207,11 @@ public class RandomPartitioner implements IPartitioner
return ownerships;
}
+ public Token getMaximumToken()
+ {
+ return new BigIntegerToken(MAXIMUM);
+ }
+
public AbstractType<?> getTokenValidator()
{
return IntegerType.instance;
@@ -203,4 +221,10 @@ public class RandomPartitioner implements IPartitioner
{
return partitionOrdering;
}
+
+ public Optional<Splitter> splitter()
+ {
+ return Optional.of(splitter);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/dht/Range.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Range.java b/src/java/org/apache/cassandra/dht/Range.java
index 1fc6c46..3cc3b23 100644
--- a/src/java/org/apache/cassandra/dht/Range.java
+++ b/src/java/org/apache/cassandra/dht/Range.java
@@ -473,6 +473,23 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen
return new Range<T>(left, newRight);
}
+ public static <T extends RingPosition<T>> List<Range<T>> sort(Collection<Range<T>> ranges)
+ {
+ List<Range<T>> output = new ArrayList<>(ranges.size());
+ for (Range<T> r : ranges)
+ output.addAll(r.unwrap());
+ // sort by left
+ Collections.sort(output, new Comparator<Range<T>>()
+ {
+ public int compare(Range<T> b1, Range<T> b2)
+ {
+ return b1.left.compareTo(b2.left);
+ }
+ });
+ return output;
+ }
+
+
/**
* Compute a range of keys corresponding to a given range of token.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/dht/Splitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Splitter.java b/src/java/org/apache/cassandra/dht/Splitter.java
new file mode 100644
index 0000000..67b578d
--- /dev/null
+++ b/src/java/org/apache/cassandra/dht/Splitter.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Partition splitter.
+ */
+public abstract class Splitter
+{
+ private final IPartitioner partitioner;
+
+ protected Splitter(IPartitioner partitioner)
+ {
+ this.partitioner = partitioner;
+ }
+
+ protected abstract Token tokenForValue(BigInteger value);
+
+ protected abstract BigInteger valueForToken(Token token);
+
+ public List<Token> splitOwnedRanges(int parts, List<Range<Token>> localRanges, boolean dontSplitRanges)
+ {
+ if (localRanges.isEmpty() || parts == 1)
+ return Collections.singletonList(partitioner.getMaximumToken());
+
+ BigInteger totalTokens = BigInteger.ZERO;
+ for (Range<Token> r : localRanges)
+ {
+ BigInteger right = valueForToken(token(r.right));
+ totalTokens = totalTokens.add(right.subtract(valueForToken(r.left)));
+ }
+ BigInteger perPart = totalTokens.divide(BigInteger.valueOf(parts));
+
+ if (dontSplitRanges)
+ return splitOwnedRangesNoPartialRanges(localRanges, perPart, parts);
+
+ List<Token> boundaries = new ArrayList<>();
+ BigInteger sum = BigInteger.ZERO;
+ for (Range<Token> r : localRanges)
+ {
+ Token right = token(r.right);
+ BigInteger currentRangeWidth = valueForToken(right).subtract(valueForToken(r.left)).abs();
+ BigInteger left = valueForToken(r.left);
+ while (sum.add(currentRangeWidth).compareTo(perPart) >= 0)
+ {
+ BigInteger withinRangeBoundary = perPart.subtract(sum);
+ left = left.add(withinRangeBoundary);
+ boundaries.add(tokenForValue(left));
+ currentRangeWidth = currentRangeWidth.subtract(withinRangeBoundary);
+ sum = BigInteger.ZERO;
+ }
+ sum = sum.add(currentRangeWidth);
+ }
+ boundaries.set(boundaries.size() - 1, partitioner.getMaximumToken());
+
+ assert boundaries.size() == parts : boundaries.size() +"!="+parts+" "+boundaries+":"+localRanges;
+ return boundaries;
+ }
+
+ private List<Token> splitOwnedRangesNoPartialRanges(List<Range<Token>> localRanges, BigInteger perPart, int parts)
+ {
+ List<Token> boundaries = new ArrayList<>(parts);
+ BigInteger sum = BigInteger.ZERO;
+ int i = 0;
+ while (boundaries.size() < parts - 1)
+ {
+ Range<Token> r = localRanges.get(i);
+ Range<Token> nextRange = localRanges.get(i + 1);
+ Token right = token(r.right);
+ Token nextRight = token(nextRange.right);
+
+ BigInteger currentRangeWidth = valueForToken(right).subtract(valueForToken(r.left));
+ BigInteger nextRangeWidth = valueForToken(nextRight).subtract(valueForToken(nextRange.left));
+ sum = sum.add(currentRangeWidth);
+ // does this or next range take us beyond the per part limit?
+ if (sum.compareTo(perPart) > 0 || sum.add(nextRangeWidth).compareTo(perPart) > 0)
+ {
+ // Either this or the next range will take us beyond the perPart limit. Will stopping now or
+ // adding the next range create the smallest difference to perPart?
+ BigInteger diffCurrent = sum.subtract(perPart).abs();
+ BigInteger diffNext = sum.add(nextRangeWidth).subtract(perPart).abs();
+ if (diffNext.compareTo(diffCurrent) >= 0)
+ {
+ sum = BigInteger.ZERO;
+ boundaries.add(right);
+ }
+ }
+ i++;
+ }
+ boundaries.add(partitioner.getMaximumToken());
+ return boundaries;
+ }
+
+ /**
+ * We avoid calculating for wrap around ranges, instead we use the actual max token, and then, when translating
+ * to PartitionPositions, we include tokens from .minKeyBound to .maxKeyBound to make sure we include all tokens.
+ */
+ private Token token(Token t)
+ {
+ return t.equals(partitioner.getMinimumToken()) ? partitioner.getMaximumToken() : t;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
index 68dbd74..2217ae2 100644
--- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
@@ -35,9 +35,11 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
public class SimpleSSTableMultiWriter implements SSTableMultiWriter
{
private final SSTableWriter writer;
+ private final LifecycleTransaction txn;
- protected SimpleSSTableMultiWriter(SSTableWriter writer)
+ protected SimpleSSTableMultiWriter(SSTableWriter writer, LifecycleTransaction txn)
{
+ this.txn = txn;
this.writer = writer;
}
@@ -90,6 +92,7 @@ public class SimpleSSTableMultiWriter implements SSTableMultiWriter
public Throwable abort(Throwable accumulate)
{
+ txn.untrackNew(writer);
return writer.abort(accumulate);
}
@@ -114,6 +117,6 @@ public class SimpleSSTableMultiWriter implements SSTableMultiWriter
LifecycleTransaction txn)
{
SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, indexes, txn);
- return new SimpleSSTableMultiWriter(writer);
+ return new SimpleSSTableMultiWriter(writer, txn);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
new file mode 100644
index 0000000..674ed7f
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.service.StorageService;
+
+public class RangeAwareSSTableWriter implements SSTableMultiWriter
+{
+ private final List<PartitionPosition> boundaries;
+ private final Directories.DataDirectory[] directories;
+ private final int sstableLevel;
+ private final long estimatedKeys;
+ private final long repairedAt;
+ private final SSTableFormat.Type format;
+ private final SerializationHeader.Component header;
+ private final LifecycleTransaction txn;
+ private int currentIndex = -1;
+ public final ColumnFamilyStore cfs;
+ private final List<SSTableMultiWriter> finishedWriters = new ArrayList<>();
+ private final List<SSTableReader> finishedReaders = new ArrayList<>();
+ private SSTableMultiWriter currentWriter = null;
+
+ public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader.Component header) throws IOException
+ {
+ directories = cfs.getDirectories().getWriteableLocations();
+ this.sstableLevel = sstableLevel;
+ this.cfs = cfs;
+ this.estimatedKeys = estimatedKeys / directories.length;
+ this.repairedAt = repairedAt;
+ this.format = format;
+ this.txn = txn;
+ this.header = header;
+ boundaries = StorageService.getDiskBoundaries(cfs, directories);
+ if (boundaries == null)
+ {
+ Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
+ if (localDir == null)
+ throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
+ Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format));
+ currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), txn);
+ }
+ }
+
+ private void maybeSwitchWriter(DecoratedKey key)
+ {
+ if (boundaries == null)
+ return;
+
+ boolean switched = false;
+ while (currentIndex < 0 || key.compareTo(boundaries.get(currentIndex)) > 0)
+ {
+ switched = true;
+ currentIndex++;
+ }
+
+ if (switched)
+ {
+ if (currentWriter != null)
+ finishedWriters.add(currentWriter);
+
+ Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(directories[currentIndex])), format);
+ currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), txn);
+ }
+ }
+
+ public boolean append(UnfilteredRowIterator partition)
+ {
+ maybeSwitchWriter(partition.partitionKey());
+ return currentWriter.append(partition);
+ }
+
+ @Override
+ public Collection<SSTableReader> finish(long repairedAt, long maxDataAge, boolean openResult)
+ {
+ if (currentWriter != null)
+ finishedWriters.add(currentWriter);
+ currentWriter = null;
+ for (SSTableMultiWriter writer : finishedWriters)
+ {
+ if (writer.getFilePointer() > 0)
+ finishedReaders.addAll(writer.finish(repairedAt, maxDataAge, openResult));
+ else
+ SSTableMultiWriter.abortOrDie(writer);
+ }
+ return finishedReaders;
+ }
+
+ @Override
+ public Collection<SSTableReader> finish(boolean openResult)
+ {
+ if (currentWriter != null)
+ finishedWriters.add(currentWriter);
+ currentWriter = null;
+ for (SSTableMultiWriter writer : finishedWriters)
+ {
+ if (writer.getFilePointer() > 0)
+ finishedReaders.addAll(writer.finish(openResult));
+ else
+ SSTableMultiWriter.abortOrDie(writer);
+ }
+ return finishedReaders;
+ }
+
+ @Override
+ public Collection<SSTableReader> finished()
+ {
+ return finishedReaders;
+ }
+
+ @Override
+ public SSTableMultiWriter setOpenResult(boolean openResult)
+ {
+ finishedWriters.forEach((w) -> w.setOpenResult(openResult));
+ currentWriter.setOpenResult(openResult);
+ return this;
+ }
+
+ public String getFilename()
+ {
+ return String.join("/", cfs.keyspace.getName(), cfs.getTableName());
+ }
+
+ @Override
+ public long getFilePointer()
+ {
+ return currentWriter.getFilePointer();
+ }
+
+ @Override
+ public UUID getCfId()
+ {
+ return currentWriter.getCfId();
+ }
+
+ @Override
+ public Throwable commit(Throwable accumulate)
+ {
+ if (currentWriter != null)
+ finishedWriters.add(currentWriter);
+ currentWriter = null;
+ for (SSTableMultiWriter writer : finishedWriters)
+ accumulate = writer.commit(accumulate);
+ return accumulate;
+ }
+
+ @Override
+ public Throwable abort(Throwable accumulate)
+ {
+ if (currentWriter != null)
+ finishedWriters.add(currentWriter);
+ currentWriter = null;
+ for (SSTableMultiWriter finishedWriter : finishedWriters)
+ accumulate = finishedWriter.abort(accumulate);
+
+ return accumulate;
+ }
+
+ @Override
+ public void prepareToCommit()
+ {
+ if (currentWriter != null)
+ finishedWriters.add(currentWriter);
+ currentWriter = null;
+ finishedWriters.forEach(SSTableMultiWriter::prepareToCommit);
+ }
+
+ @Override
+ public void close()
+ {
+ if (currentWriter != null)
+ finishedWriters.add(currentWriter);
+ currentWriter = null;
+ finishedWriters.forEach(SSTableMultiWriter::close);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
index 925efd6..a083218 100644
--- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
+++ b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
@@ -24,7 +24,12 @@ public abstract class DiskAwareRunnable extends WrappedRunnable
{
protected Directories.DataDirectory getWriteDirectory(long writeSize)
{
- Directories.DataDirectory directory = getDirectories().getWriteableLocation(writeSize);
+ Directories.DataDirectory directory;
+ directory = getDirectory();
+
+ if (directory == null) // ok panic - write anywhere
+ directory = getDirectories().getWriteableLocation(writeSize);
+
if (directory == null)
throw new RuntimeException("Insufficient disk space to write " + writeSize + " bytes");
@@ -36,4 +41,14 @@ public abstract class DiskAwareRunnable extends WrappedRunnable
* @return Directories instance for the CF.
*/
protected abstract Directories getDirectories();
+ protected abstract Directories.DataDirectory getDirectory();
+
+ /**
+ * Called if no disk is available with free space for the full write size.
+ * @return true if the scope of the task was successfully reduced.
+ */
+ public boolean reduceScopeForLimitedSpace()
+ {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 60ede18..24bebae 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -64,6 +64,7 @@ import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.RangeStreamer;
import org.apache.cassandra.dht.RingPosition;
+import org.apache.cassandra.dht.Splitter;
import org.apache.cassandra.dht.StreamStateStore;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.dht.Token.TokenFactory;
@@ -2625,6 +2626,18 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
}
+ public int relocateSSTables(String keyspaceName, String ... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ {
+ CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
+ for (ColumnFamilyStore cfs : getValidColumnFamilies(false, false, keyspaceName, columnFamilies))
+ {
+ CompactionManager.AllSSTableOpStatus oneStatus = cfs.relocateSSTables();
+ if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
+ status = oneStatus;
+ }
+ return status.statusCode;
+ }
+
/**
* Takes the snapshot for the given keyspaces. A snapshot name must be specified.
*
@@ -4459,4 +4472,61 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
logger.info(String.format("Updated hinted_handoff_throttle_in_kb to %d", throttleInKB));
}
+ public static List<PartitionPosition> getDiskBoundaries(ColumnFamilyStore cfs, Directories.DataDirectory[] directories)
+ {
+ if (!cfs.getPartitioner().splitter().isPresent())
+ return null;
+
+ Collection<Range<Token>> lr;
+
+ if (StorageService.instance.isBootstrapMode())
+ {
+ lr = StorageService.instance.getTokenMetadata().getPendingRanges(cfs.keyspace.getName(), FBUtilities.getBroadcastAddress());
+ }
+ else
+ {
+ // Reason we use use the future settled TMD is that if we decommission a node, we want to stream
+ // from that node to the correct location on disk, if we didn't, we would put new files in the wrong places.
+ // We do this to minimize the amount of data we need to move in rebalancedisks once everything settled
+ TokenMetadata tmd = StorageService.instance.getTokenMetadata().cloneAfterAllSettled();
+ lr = cfs.keyspace.getReplicationStrategy().getAddressRanges(tmd).get(FBUtilities.getBroadcastAddress());
+ }
+
+ if (lr == null || lr.isEmpty())
+ return null;
+ List<Range<Token>> localRanges = Range.sort(lr);
+
+ return getDiskBoundaries(localRanges, cfs.getPartitioner(), directories);
+ }
+
+ public static List<PartitionPosition> getDiskBoundaries(ColumnFamilyStore cfs)
+ {
+ return getDiskBoundaries(cfs, cfs.getDirectories().getWriteableLocations());
+ }
+
+ /**
+ * Returns a list of disk boundaries, the result will differ depending on whether vnodes are enabled or not.
+ *
+ * What is returned are upper bounds for the disks, meaning everything from partitioner.minToken up to
+ * getDiskBoundaries(..).get(0) should be on the first disk, everything between 0 to 1 should be on the second disk
+ * etc.
+ *
+ * The final entry in the returned list will always be the partitioner maximum tokens upper key bound
+ *
+ * @param localRanges
+ * @param partitioner
+ * @param dataDirectories
+ * @return
+ */
+ public static List<PartitionPosition> getDiskBoundaries(List<Range<Token>> localRanges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories)
+ {
+ assert partitioner.splitter().isPresent();
+ Splitter splitter = partitioner.splitter().get();
+ List<Token> boundaries = splitter.splitOwnedRanges(dataDirectories.length, localRanges, DatabaseDescriptor.getNumTokens() > 1);
+ List<PartitionPosition> diskBoundaries = new ArrayList<>();
+ for (int i = 0; i < boundaries.size() - 1; i++)
+ diskBoundaries.add(boundaries.get(i).maxKeyBound());
+ diskBoundaries.add(partitioner.getMaximumToken().maxKeyBound());
+ return diskBoundaries;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 70691c7..eef34c0 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -248,6 +248,7 @@ public interface StorageServiceMBean extends NotificationEmitter
*/
public void forceKeyspaceCompaction(boolean splitOutput, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException;
+ public int relocateSSTables(String keyspace, String ... cfnames) throws IOException, ExecutionException, InterruptedException;
/**
* Trigger a cleanup of keys on a single keyspace
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 87dcda0..61eb13f 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -35,9 +35,9 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
+import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -63,8 +63,6 @@ public class StreamReader
protected final int sstableLevel;
protected final SerializationHeader.Component header;
- protected Descriptor desc;
-
public StreamReader(FileMessageHeader header, StreamSession session)
{
this.session = session;
@@ -108,7 +106,7 @@ public class StreamReader
{
writePartition(deserializer, writer);
// TODO move this to BytesReadTracker
- session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
+ session.progress(writer.getFilename(), ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
}
return writer;
}
@@ -132,10 +130,7 @@ public class StreamReader
if (localDir == null)
throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
- desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format));
-
-
- return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), session.getTransaction(cfId));
+ return new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, format, sstableLevel, totalSize, session.getTransaction(cfId), header);
}
protected void drain(InputStream dis, long bytesRead) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 5355c3e..9078acc 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -584,9 +584,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber
receivers.get(message.header.cfId).received(message.sstable);
}
- public void progress(Descriptor desc, ProgressInfo.Direction direction, long bytes, long total)
+ public void progress(String filename, ProgressInfo.Direction direction, long bytes, long total)
{
- ProgressInfo progress = new ProgressInfo(peer, index, desc.filenameFor(Component.DATA), direction, bytes, total);
+ ProgressInfo progress = new ProgressInfo(peer, index, filename, direction, bytes, total);
streamResult.handleProgress(progress);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java
index 106677c..ca35c0b 100644
--- a/src/java/org/apache/cassandra/streaming/StreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java
@@ -102,7 +102,7 @@ public class StreamWriter
long lastBytesRead = write(file, validator, readOffset, length, bytesRead);
bytesRead += lastBytesRead;
progress += (lastBytesRead - readOffset);
- session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize);
+ session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize);
readOffset = 0;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 4d10244..c123102 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -25,6 +25,7 @@ import java.nio.channels.ReadableByteChannel;
import com.google.common.base.Throwables;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -95,7 +96,7 @@ public class CompressedStreamReader extends StreamReader
{
writePartition(deserializer, writer);
// when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred
- session.progress(desc, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
+ session.progress(writer.getFilename(), ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
}
}
return writer;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index adbd091..93e0f76 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@ -26,6 +26,7 @@ import java.util.List;
import com.google.common.base.Function;
import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
@@ -75,7 +76,7 @@ public class CompressedStreamWriter extends StreamWriter
long lastWrite = out.applyToChannel((wbc) -> fc.transferTo(section.left + bytesTransferredFinal, toTransfer, wbc));
bytesTransferred += lastWrite;
progress += lastWrite;
- session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize);
+ session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 1078004..6f1c753 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -299,6 +299,11 @@ public class NodeProbe implements AutoCloseable
ssProxy.forceKeyspaceCompaction(splitOutput, keyspaceName, tableNames);
}
+ public void relocateSSTables(String keyspace, String[] cfnames) throws IOException, ExecutionException, InterruptedException
+ {
+ ssProxy.relocateSSTables(keyspace, cfnames);
+ }
+
public void forceKeyspaceFlush(String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
{
ssProxy.forceKeyspaceFlush(keyspaceName, tableNames);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 668c075..9728356 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -134,7 +134,8 @@ public class NodeTool
DisableHintsForDC.class,
EnableHintsForDC.class,
FailureDetectorInfo.class,
- RefreshSizeEstimates.class
+ RefreshSizeEstimates.class,
+ RelocateSSTables.class
);
Cli.CliBuilder<Runnable> builder = Cli.builder("nodetool");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java b/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
index b27b07a..b62512a 100644
--- a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
+++ b/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
@@ -17,18 +17,20 @@
*/
package org.apache.cassandra.tools;
+import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.base.Throwables;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -96,7 +98,7 @@ public class SSTableOfflineRelevel
Keyspace ks = Keyspace.openWithoutSSTables(keyspace);
ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnfamily);
Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
- Set<SSTableReader> sstables = new HashSet<>();
+ SetMultimap<File, SSTableReader> sstableMultimap = HashMultimap.create();
for (Map.Entry<Descriptor, Set<Component>> sstable : lister.list().entrySet())
{
if (sstable.getKey() != null)
@@ -104,7 +106,7 @@ public class SSTableOfflineRelevel
try
{
SSTableReader reader = SSTableReader.open(sstable.getKey());
- sstables.add(reader);
+ sstableMultimap.put(reader.descriptor.directory, reader);
}
catch (Throwable t)
{
@@ -113,13 +115,20 @@ public class SSTableOfflineRelevel
}
}
}
- if (sstables.isEmpty())
+ if (sstableMultimap.isEmpty())
{
out.println("No sstables to relevel for "+keyspace+"."+columnfamily);
System.exit(1);
}
- Relevel rl = new Relevel(sstables);
- rl.relevel(dryRun);
+ for (File directory : sstableMultimap.keySet())
+ {
+ if (!sstableMultimap.get(directory).isEmpty())
+ {
+ Relevel rl = new Relevel(sstableMultimap.get(directory));
+ out.println("For sstables in " + directory + ":");
+ rl.relevel(dryRun);
+ }
+ }
System.exit(0);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/tools/nodetool/RelocateSSTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/RelocateSSTables.java b/src/java/org/apache/cassandra/tools/nodetool/RelocateSSTables.java
new file mode 100644
index 0000000..8522bc4
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/RelocateSSTables.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.tools.nodetool;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.airlift.command.Arguments;
+import io.airlift.command.Command;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool;
+
+@Command(name = "relocatesstables", description = "Relocates sstables to the correct disk")
+public class RelocateSSTables extends NodeTool.NodeToolCmd
+{
+ @Arguments(usage = "<keyspace> <table>", description = "The keyspace and table name")
+ private List<String> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ List<String> keyspaces = parseOptionalKeyspace(args, probe);
+ String[] cfnames = parseOptionalTables(args);
+ try
+ {
+ for (String keyspace : keyspaces)
+ probe.relocateSSTables(keyspace, cfnames);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Got error while relocating", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
index 96ee072..0f05524 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
@@ -59,8 +59,8 @@ public class LongLeveledCompactionStrategyTest
Keyspace keyspace = Keyspace.open(ksname);
ColumnFamilyStore store = keyspace.getColumnFamilyStore(cfname);
store.disableAutoCompaction();
-
- LeveledCompactionStrategy lcs = (LeveledCompactionStrategy)store.getCompactionStrategyManager().getStrategies().get(1);
+ CompactionStrategyManager mgr = store.getCompactionStrategyManager();
+ LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) mgr.getStrategies().get(1).get(0);
ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB value, make it easy to have multiple files
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 27b774d..824c533 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -633,14 +633,14 @@ public class ScrubTest
{
SerializationHeader header = new SerializationHeader(true, metadata, metadata.partitionColumns(), EncodingStats.NO_STATS);
MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(0);
- return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, metadata, collector, header, txn));
+ return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, metadata, collector, header, txn), txn);
}
private static class TestMultiWriter extends SimpleSSTableMultiWriter
{
- TestMultiWriter(SSTableWriter writer)
+ TestMultiWriter(SSTableWriter writer, LifecycleTransaction txn)
{
- super(writer);
+ super(writer, txn);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index 3c0098b..7fee251 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.db.compaction;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.junit.Test;
@@ -215,9 +216,9 @@ public class CompactionsCQLTest extends CQLTester
public boolean verifyStrategies(CompactionStrategyManager manager, Class<? extends AbstractCompactionStrategy> expected)
{
boolean found = false;
- for (AbstractCompactionStrategy actualStrategy : manager.getStrategies())
+ for (List<AbstractCompactionStrategy> strategies : manager.getStrategies())
{
- if (!actualStrategy.getClass().equals(expected))
+ if (!strategies.stream().allMatch((strategy) -> strategy.getClass().equals(expected)))
return false;
found = true;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index f2ddb00..1676896 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -56,6 +56,7 @@ import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.FBUtilities;
+import static java.util.Collections.singleton;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -122,10 +123,11 @@ public class LeveledCompactionStrategyTest
}
waitForLeveling(cfs);
- CompactionStrategyManager strategy = cfs.getCompactionStrategyManager();
+ CompactionStrategyManager strategyManager = cfs.getCompactionStrategyManager();
// Checking we're not completely bad at math
- int l1Count = strategy.getSSTableCountPerLevel()[1];
- int l2Count = strategy.getSSTableCountPerLevel()[2];
+
+ int l1Count = strategyManager.getSSTableCountPerLevel()[1];
+ int l2Count = strategyManager.getSSTableCountPerLevel()[2];
if (l1Count == 0 || l2Count == 0)
{
logger.error("L1 or L2 has 0 sstables. Expected > 0 on both.");
@@ -177,10 +179,10 @@ public class LeveledCompactionStrategyTest
}
waitForLeveling(cfs);
- CompactionStrategyManager strategy = cfs.getCompactionStrategyManager();
+ CompactionStrategyManager strategyManager = cfs.getCompactionStrategyManager();
// Checking we're not completely bad at math
- assertTrue(strategy.getSSTableCountPerLevel()[1] > 0);
- assertTrue(strategy.getSSTableCountPerLevel()[2] > 0);
+ assertTrue(strategyManager.getSSTableCountPerLevel()[1] > 0);
+ assertTrue(strategyManager.getSSTableCountPerLevel()[2] > 0);
Range<Token> range = new Range<>(Util.token(""), Util.token(""));
int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(FBUtilities.nowInSeconds());
@@ -196,7 +198,7 @@ public class LeveledCompactionStrategyTest
*/
private void waitForLeveling(ColumnFamilyStore cfs) throws InterruptedException
{
- CompactionStrategyManager strategy = cfs.getCompactionStrategyManager();
+ CompactionStrategyManager strategy = cfs.getCompactionStrategyManager();
// L0 is the lowest priority, so when that's done, we know everything is done
while (strategy.getSSTableCountPerLevel()[0] > 1)
Thread.sleep(100);
@@ -224,7 +226,7 @@ public class LeveledCompactionStrategyTest
}
waitForLeveling(cfs);
- LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) (cfs.getCompactionStrategyManager()).getStrategies().get(1);
+ LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getStrategies().get(1).get(0);
assert strategy.getLevelSize(1) > 0;
// get LeveledScanner for level 1 sstables
@@ -260,7 +262,7 @@ public class LeveledCompactionStrategyTest
cfs.forceBlockingFlush();
}
cfs.forceBlockingFlush();
- LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) ( cfs.getCompactionStrategyManager()).getStrategies().get(1);
+ LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getStrategies().get(1).get(0);
cfs.forceMajorCompaction();
for (SSTableReader s : cfs.getLiveSSTables())
@@ -306,14 +308,14 @@ public class LeveledCompactionStrategyTest
while(CompactionManager.instance.isCompacting(Arrays.asList(cfs)))
Thread.sleep(100);
- CompactionStrategyManager strategy = cfs.getCompactionStrategyManager();
- List<AbstractCompactionStrategy> strategies = strategy.getStrategies();
- LeveledCompactionStrategy repaired = (LeveledCompactionStrategy) strategies.get(0);
- LeveledCompactionStrategy unrepaired = (LeveledCompactionStrategy) strategies.get(1);
+ CompactionStrategyManager manager = cfs.getCompactionStrategyManager();
+ List<List<AbstractCompactionStrategy>> strategies = manager.getStrategies();
+ LeveledCompactionStrategy repaired = (LeveledCompactionStrategy) strategies.get(0).get(0);
+ LeveledCompactionStrategy unrepaired = (LeveledCompactionStrategy) strategies.get(1).get(0);
assertEquals(0, repaired.manifest.getLevelCount() );
assertEquals(2, unrepaired.manifest.getLevelCount());
- assertTrue(strategy.getSSTableCountPerLevel()[1] > 0);
- assertTrue(strategy.getSSTableCountPerLevel()[2] > 0);
+ assertTrue(manager.getSSTableCountPerLevel()[1] > 0);
+ assertTrue(manager.getSSTableCountPerLevel()[2] > 0);
for (SSTableReader sstable : cfs.getLiveSSTables())
assertFalse(sstable.isRepaired());
@@ -331,7 +333,7 @@ public class LeveledCompactionStrategyTest
sstable1.reloadSSTableMetadata();
assertTrue(sstable1.isRepaired());
- strategy.handleNotification(new SSTableRepairStatusChanged(Arrays.asList(sstable1)), this);
+ manager.handleNotification(new SSTableRepairStatusChanged(Arrays.asList(sstable1)), this);
int repairedSSTableCount = 0;
for (List<SSTableReader> level : repaired.manifest.generations)
@@ -343,7 +345,7 @@ public class LeveledCompactionStrategyTest
assertFalse(unrepaired.manifest.generations[2].contains(sstable1));
unrepaired.removeSSTable(sstable2);
- strategy.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable2)), this);
+ manager.handleNotification(new SSTableAddedNotification(singleton(sstable2)), this);
assertTrue(unrepaired.manifest.getLevel(1).contains(sstable2));
assertFalse(repaired.manifest.getLevel(1).contains(sstable2));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index 7b9b19c..4f49389 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@ -297,7 +297,7 @@ public class TrackerTest
Assert.assertTrue(tracker.getView().flushingMemtables.contains(prev2));
SSTableReader reader = MockSchema.sstable(0, 10, false, cfs);
- tracker.replaceFlushed(prev2, Collections.singleton(reader));
+ tracker.replaceFlushed(prev2, singleton(reader));
Assert.assertEquals(1, tracker.getView().sstables.size());
Assert.assertEquals(1, listener.received.size());
Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(0)).added);
@@ -314,13 +314,13 @@ public class TrackerTest
tracker.markFlushing(prev1);
reader = MockSchema.sstable(0, 10, true, cfs);
cfs.invalidate(false);
- tracker.replaceFlushed(prev1, Collections.singleton(reader));
+ tracker.replaceFlushed(prev1, singleton(reader));
Assert.assertEquals(0, tracker.getView().sstables.size());
Assert.assertEquals(0, tracker.getView().flushingMemtables.size());
Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
Assert.assertEquals(3, listener.received.size());
Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(0)).added);
- Assert.assertTrue(listener.received.get(1) instanceof SSTableDeletingNotification);
+ Assert.assertTrue(listener.received.get(1) instanceof SSTableDeletingNotification);
Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(2)).removed.size());
DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
index 523c203..e787cc4 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import static com.google.common.collect.ImmutableSet.copyOf;
import static com.google.common.collect.ImmutableSet.of;
import static com.google.common.collect.Iterables.concat;
+import static java.util.Collections.singleton;
import static org.apache.cassandra.db.lifecycle.Helpers.emptySet;
public class ViewTest
@@ -195,7 +196,7 @@ public class ViewTest
Assert.assertEquals(memtable3, cur.getCurrentMemtable());
SSTableReader sstable = MockSchema.sstable(1, cfs);
- cur = View.replaceFlushed(memtable1, Collections.singleton(sstable)).apply(cur);
+ cur = View.replaceFlushed(memtable1, singleton(sstable)).apply(cur);
Assert.assertEquals(0, cur.flushingMemtables.size());
Assert.assertEquals(1, cur.liveMemtables.size());
Assert.assertEquals(memtable3, cur.getCurrentMemtable());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/LengthPartitioner.java b/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
index 9cefbf2..e2202fe 100644
--- a/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
+++ b/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
@@ -61,6 +61,12 @@ public class LengthPartitioner implements IPartitioner
return MINIMUM;
}
+ @Override
+ public Token getMaximumToken()
+ {
+ return null;
+ }
+
public BigIntegerToken getRandomToken()
{
return new BigIntegerToken(BigInteger.valueOf(new Random().nextInt(15)));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/dht/SplitterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/SplitterTest.java b/test/unit/org/apache/cassandra/dht/SplitterTest.java
new file mode 100644
index 0000000..751a7d7
--- /dev/null
+++ b/test/unit/org/apache/cassandra/dht/SplitterTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.dht;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class SplitterTest
+{
+
+ @Test
+ public void randomSplitTestNoVNodesRandomPartitioner()
+ {
+ randomSplitTestNoVNodes(new RandomPartitioner());
+ }
+
+ @Test
+ public void randomSplitTestNoVNodesMurmur3Partitioner()
+ {
+ randomSplitTestNoVNodes(new Murmur3Partitioner());
+ }
+
+ @Test
+ public void randomSplitTestVNodesRandomPartitioner()
+ {
+ randomSplitTestVNodes(new RandomPartitioner());
+ }
+ @Test
+ public void randomSplitTestVNodesMurmur3Partitioner()
+ {
+ randomSplitTestVNodes(new Murmur3Partitioner());
+ }
+
+ public void randomSplitTestNoVNodes(IPartitioner partitioner)
+ {
+ Splitter splitter = partitioner.splitter().get();
+ Random r = new Random();
+ for (int i = 0; i < 10000; i++)
+ {
+ List<Range<Token>> localRanges = generateLocalRanges(1, r.nextInt(4)+1, splitter, r, partitioner instanceof RandomPartitioner);
+ List<Token> boundaries = splitter.splitOwnedRanges(r.nextInt(9) + 1, localRanges, false);
+ assertTrue("boundaries = "+boundaries+" ranges = "+localRanges, assertRangeSizeEqual(localRanges, boundaries, partitioner, splitter, true));
+ }
+ }
+
+ public void randomSplitTestVNodes(IPartitioner partitioner)
+ {
+ Splitter splitter = partitioner.splitter().get();
+ Random r = new Random();
+ for (int i = 0; i < 10000; i++)
+ {
+ // we need many tokens to be able to split evenly over the disks
+ int numTokens = 172 + r.nextInt(128);
+ int rf = r.nextInt(4) + 2;
+ int parts = r.nextInt(5)+1;
+ List<Range<Token>> localRanges = generateLocalRanges(numTokens, rf, splitter, r, partitioner instanceof RandomPartitioner);
+ List<Token> boundaries = splitter.splitOwnedRanges(parts, localRanges, true);
+ if (!assertRangeSizeEqual(localRanges, boundaries, partitioner, splitter, false))
+ fail(String.format("Could not split %d tokens with rf=%d into %d parts (localRanges=%s, boundaries=%s)", numTokens, rf, parts, localRanges, boundaries));
+ }
+ }
+
+ private boolean assertRangeSizeEqual(List<Range<Token>> localRanges, List<Token> tokens, IPartitioner partitioner, Splitter splitter, boolean splitIndividualRanges)
+ {
+ Token start = partitioner.getMinimumToken();
+ List<BigInteger> splits = new ArrayList<>();
+
+ for (int i = 0; i < tokens.size(); i++)
+ {
+ Token end = i == tokens.size() - 1 ? partitioner.getMaximumToken() : tokens.get(i);
+ splits.add(sumOwnedBetween(localRanges, start, end, splitter, splitIndividualRanges));
+ start = end;
+ }
+ // when we dont need to keep around full ranges, the difference is small between the partitions
+ BigDecimal delta = splitIndividualRanges ? BigDecimal.valueOf(0.001) : BigDecimal.valueOf(0.2);
+ boolean allBalanced = true;
+ for (BigInteger b : splits)
+ {
+ for (BigInteger i : splits)
+ {
+ BigDecimal bdb = new BigDecimal(b);
+ BigDecimal bdi = new BigDecimal(i);
+ BigDecimal q = bdb.divide(bdi, 2, BigDecimal.ROUND_HALF_DOWN);
+ if (q.compareTo(BigDecimal.ONE.add(delta)) > 0 || q.compareTo(BigDecimal.ONE.subtract(delta)) < 0)
+ allBalanced = false;
+ }
+ }
+ return allBalanced;
+ }
+
+ private BigInteger sumOwnedBetween(List<Range<Token>> localRanges, Token start, Token end, Splitter splitter, boolean splitIndividualRanges)
+ {
+ BigInteger sum = BigInteger.ZERO;
+ for (Range<Token> range : localRanges)
+ {
+ if (splitIndividualRanges)
+ {
+ Set<Range<Token>> intersections = new Range<>(start, end).intersectionWith(range);
+ for (Range<Token> intersection : intersections)
+ sum = sum.add(splitter.valueForToken(intersection.right).subtract(splitter.valueForToken(intersection.left)));
+ }
+ else
+ {
+ if (new Range<>(start, end).contains(range.left))
+ sum = sum.add(splitter.valueForToken(range.right).subtract(splitter.valueForToken(range.left)));
+ }
+ }
+ return sum;
+ }
+
+ private List<Range<Token>> generateLocalRanges(int numTokens, int rf, Splitter splitter, Random r, boolean randomPartitioner)
+ {
+ int localTokens = numTokens * rf;
+ List<Token> randomTokens = new ArrayList<>();
+
+ for (int i = 0; i < localTokens * 2; i++)
+ {
+ Token t = splitter.tokenForValue(randomPartitioner ? new BigInteger(127, r) : BigInteger.valueOf(r.nextLong()));
+ randomTokens.add(t);
+ }
+
+ Collections.sort(randomTokens);
+
+ List<Range<Token>> localRanges = new ArrayList<>(localTokens);
+ for (int i = 0; i < randomTokens.size() - 1; i++)
+ {
+ assert randomTokens.get(i).compareTo(randomTokens.get(i+1)) < 0;
+ localRanges.add(new Range<>(randomTokens.get(i), randomTokens.get(i+1)));
+ i++;
+ }
+ return localRanges;
+ }
+}
[2/2] cassandra git commit: Make sure the same token does not exist
in several data directories
Posted by ma...@apache.org.
Make sure the same token does not exist in several data directories
Patch by marcuse; reviewed by Yuki Morishita and Carl Yeksigian for CASSANDRA-6696
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e2c63418
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e2c63418
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e2c63418
Branch: refs/heads/cassandra-3.2
Commit: e2c6341898fa43b0e262ef031f267587050b8d0f
Parents: 1568293
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Aug 3 10:12:47 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jan 5 16:48:50 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 5 +
conf/cassandra.yaml | 13 +-
.../org/apache/cassandra/config/Config.java | 2 +-
.../cassandra/config/DatabaseDescriptor.java | 3 -
.../cassandra/db/BlacklistedDirectories.java | 10 +
.../db/BlacklistedDirectoriesMBean.java | 9 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 97 ++++-
.../org/apache/cassandra/db/Directories.java | 36 +-
src/java/org/apache/cassandra/db/Memtable.java | 193 +++++-----
.../db/compaction/CompactionManager.java | 78 +++-
.../compaction/CompactionStrategyManager.java | 361 +++++++++++++------
.../cassandra/db/compaction/CompactionTask.java | 2 +-
.../DateTieredCompactionStrategy.java | 3 +-
.../db/compaction/LeveledManifest.java | 2 +-
.../cassandra/db/compaction/OperationType.java | 3 +-
.../cassandra/db/compaction/Scrubber.java | 7 +-
.../SizeTieredCompactionStrategy.java | 9 +-
.../writers/CompactionAwareWriter.java | 91 ++++-
.../writers/DefaultCompactionWriter.java | 17 +-
.../writers/MajorLeveledCompactionWriter.java | 53 +--
.../writers/MaxSSTableSizeWriter.java | 22 +-
.../SplittingSizeTieredCompactionWriter.java | 17 +-
.../apache/cassandra/db/lifecycle/Tracker.java | 4 +-
.../org/apache/cassandra/db/lifecycle/View.java | 2 +-
.../org/apache/cassandra/dht/IPartitioner.java | 17 +
.../cassandra/dht/Murmur3Partitioner.java | 23 ++
.../apache/cassandra/dht/RandomPartitioner.java | 24 ++
src/java/org/apache/cassandra/dht/Range.java | 17 +
src/java/org/apache/cassandra/dht/Splitter.java | 124 +++++++
.../io/sstable/SimpleSSTableMultiWriter.java | 7 +-
.../sstable/format/RangeAwareSSTableWriter.java | 205 +++++++++++
.../cassandra/io/util/DiskAwareRunnable.java | 17 +-
.../cassandra/service/StorageService.java | 70 ++++
.../cassandra/service/StorageServiceMBean.java | 1 +
.../cassandra/streaming/StreamReader.java | 11 +-
.../cassandra/streaming/StreamSession.java | 4 +-
.../cassandra/streaming/StreamWriter.java | 2 +-
.../compress/CompressedStreamReader.java | 3 +-
.../compress/CompressedStreamWriter.java | 3 +-
.../org/apache/cassandra/tools/NodeProbe.java | 5 +
.../org/apache/cassandra/tools/NodeTool.java | 3 +-
.../cassandra/tools/SSTableOfflineRelevel.java | 21 +-
.../tools/nodetool/RelocateSSTables.java | 49 +++
.../LongLeveledCompactionStrategyTest.java | 4 +-
.../unit/org/apache/cassandra/db/ScrubTest.java | 6 +-
.../db/compaction/CompactionsCQLTest.java | 5 +-
.../LeveledCompactionStrategyTest.java | 36 +-
.../cassandra/db/lifecycle/TrackerTest.java | 6 +-
.../apache/cassandra/db/lifecycle/ViewTest.java | 3 +-
.../apache/cassandra/dht/LengthPartitioner.java | 6 +
.../org/apache/cassandra/dht/SplitterTest.java | 158 ++++++++
52 files changed, 1509 insertions(+), 361 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9c3a50f..43acd43 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.2
+ * Make sure tokens don't exist in several data directories (CASSANDRA-6696)
* Add requireAuthorization method to IAuthorizer (CASSANDRA-10852)
* Move static JVM options to conf/jvm.options file (CASSANDRA-10494)
* Fix CassandraVersion to accept x.y version string (CASSANDRA-10931)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 1269c98..f5f50c1 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -18,6 +18,11 @@ using the provided 'sstableupgrade' tool.
New features
------------
+ - We now make sure that a token does not exist in several data directories. This
+ means that we run one compaction strategy per data_file_directory and we use
+ one thread per directory to flush. Use nodetool relocatesstables to make sure your
+ tokens are in the correct place, or just wait and compaction will handle it. See
+ CASSANDRA-6696 for more details.
- bound maximum in-flight commit log replay mutation bytes to 64 megabytes
tunable via cassandra.commitlog_max_outstanding_replay_bytes
- Support for type casting has been added to the selection clause.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 74e1d1d..a8a90ba 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -413,14 +413,13 @@ memtable_allocation_type: heap_buffers
# This sets the amount of memtable flush writer threads. These will
# be blocked by disk io, and each one will hold a memtable in memory
-# while blocked.
+# while blocked.
#
-# memtable_flush_writers defaults to the smaller of (number of disks,
-# number of cores), with a minimum of 2 and a maximum of 8.
-#
-# If your data directories are backed by SSD, you should increase this
-# to the number of cores.
-#memtable_flush_writers: 8
+# memtable_flush_writers defaults to one per data_file_directory.
+#
+# If your data directories are backed by SSD, you can increase this, but
+# avoid having memtable_flush_writers * data_file_directories > number of cores
+#memtable_flush_writers: 1
# A fixed memory pool size in MB for for SSTable index summaries. If left
# empty, this will default to 5% of the heap size. If the memory usage of
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 2bace5e..54cb089 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -98,7 +98,7 @@ public class Config
@Deprecated
public Integer concurrent_replicates = null;
- public Integer memtable_flush_writers = null;
+ public Integer memtable_flush_writers = 1;
public Integer memtable_heap_space_in_mb;
public Integer memtable_offheap_space_in_mb;
public Float memtable_cleanup_threshold = null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 3fc0b31..c82e930 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -582,9 +582,6 @@ public class DatabaseDescriptor
if (conf.hints_directory.equals(conf.saved_caches_directory))
throw new ConfigurationException("saved_caches_directory must not be the same as the hints_directory", false);
- if (conf.memtable_flush_writers == null)
- conf.memtable_flush_writers = Math.min(8, Math.max(2, Math.min(FBUtilities.getAvailableProcessors(), conf.data_file_directories.length)));
-
if (conf.memtable_flush_writers < 1)
throw new ConfigurationException("memtable_flush_writers must be at least 1, but was " + conf.memtable_flush_writers, false);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
index f47fd57..3e6332c 100644
--- a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
+++ b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
@@ -66,6 +66,16 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
return Collections.unmodifiableSet(unwritableDirectories);
}
+ public void markUnreadable(String path)
+ {
+ maybeMarkUnreadable(new File(path));
+ }
+
+ public void markUnwritable(String path)
+ {
+ maybeMarkUnwritable(new File(path));
+ }
+
/**
* Adds parent directory of the file (or the file itself, if it is a directory)
* to the set of unreadable directories.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/BlacklistedDirectoriesMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BlacklistedDirectoriesMBean.java b/src/java/org/apache/cassandra/db/BlacklistedDirectoriesMBean.java
index 3163b9a..3fb9f39 100644
--- a/src/java/org/apache/cassandra/db/BlacklistedDirectoriesMBean.java
+++ b/src/java/org/apache/cassandra/db/BlacklistedDirectoriesMBean.java
@@ -20,10 +20,13 @@ package org.apache.cassandra.db;
import java.io.File;
import java.util.Set;
-public interface BlacklistedDirectoriesMBean {
-
+public interface BlacklistedDirectoriesMBean
+{
public Set<File> getUnreadableDirectories();
public Set<File> getUnwritableDirectories();
-
+
+ public void markUnreadable(String path);
+
+ public void markUnwritable(String path);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 738c941..6eefb5f 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -118,13 +118,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
- private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+ private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(1,
StageManager.KEEPALIVE,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory("MemtableFlushWriter"),
"internal");
+ private static final ExecutorService [] perDiskflushExecutors = new ExecutorService[DatabaseDescriptor.getAllDataFileLocations().length];
+ static
+ {
+ for (int i = 0; i < DatabaseDescriptor.getAllDataFileLocations().length; i++)
+ {
+ perDiskflushExecutors[i] = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+ StageManager.KEEPALIVE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new NamedThreadFactory("PerDiskMemtableFlushWriter_"+i),
+ "internal");
+ }
+ }
+
// post-flush executor is single threaded to provide guarantee that any flush Future on a CF will never return until prior flushes have completed
private static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor(1,
StageManager.KEEPALIVE,
@@ -458,7 +472,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public Directories getDirectories()
{
- return directories;
+ // todo, hack since we need to know the data directories when constructing the compaction strategy
+ if (directories != null)
+ return directories;
+ return new Directories(metadata, initialDirectories);
}
public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn)
@@ -1033,11 +1050,74 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
for (Memtable memtable : memtables)
{
- // flush the memtable
- MoreExecutors.sameThreadExecutor().execute(memtable.flushRunnable());
+ List<Future<SSTableMultiWriter>> futures = new ArrayList<>();
+ long totalBytesOnDisk = 0;
+ long maxBytesOnDisk = 0;
+ long minBytesOnDisk = Long.MAX_VALUE;
+ List<SSTableReader> sstables = new ArrayList<>();
+ try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH))
+ {
+ // flush the memtable
+ List<Memtable.FlushRunnable> flushRunnables = memtable.flushRunnables(txn);
+
+ for (int i = 0; i < flushRunnables.size(); i++)
+ futures.add(perDiskflushExecutors[i].submit(flushRunnables.get(i)));
+
+ List<SSTableMultiWriter> flushResults = Lists.newArrayList(FBUtilities.waitOnFutures(futures));
+
+ try
+ {
+ Iterator<SSTableMultiWriter> writerIterator = flushResults.iterator();
+ while (writerIterator.hasNext())
+ {
+ SSTableMultiWriter writer = writerIterator.next();
+ if (writer.getFilePointer() > 0)
+ {
+ writer.setOpenResult(true).prepareToCommit();
+ }
+ else
+ {
+ maybeFail(writer.abort(null));
+ writerIterator.remove();
+ }
+ }
+ }
+ catch (Throwable t)
+ {
+ for (SSTableMultiWriter writer : flushResults)
+ t = writer.abort(t);
+ t = txn.abort(t);
+ Throwables.propagate(t);
+ }
+
+ txn.prepareToCommit();
+
+ Throwable accumulate = null;
+ for (SSTableMultiWriter writer : flushResults)
+ accumulate = writer.commit(accumulate);
+
+ maybeFail(txn.commit(accumulate));
+
+ for (SSTableMultiWriter writer : flushResults)
+ {
+ Collection<SSTableReader> flushedSSTables = writer.finished();
+ for (SSTableReader sstable : flushedSSTables)
+ {
+ if (sstable != null)
+ {
+ sstables.add(sstable);
+ long size = sstable.bytesOnDisk();
+ totalBytesOnDisk += size;
+ maxBytesOnDisk = Math.max(maxBytesOnDisk, size);
+ minBytesOnDisk = Math.min(minBytesOnDisk, size);
+ }
+ }
+ }
+ }
+ memtable.cfs.replaceFlushed(memtable, sstables);
reclaim(memtable);
+ logger.debug("Flushed to {} ({} sstables, {} bytes), biggest {} bytes, smallest {} bytes", sstables, sstables.size(), totalBytesOnDisk, maxBytesOnDisk, minBytesOnDisk);
}
-
// signal the post-flush we've done our work
postFlush.latch.countDown();
}
@@ -1366,6 +1446,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, excludeCurrentVersion);
}
+ public CompactionManager.AllSSTableOpStatus relocateSSTables() throws ExecutionException, InterruptedException
+ {
+ return CompactionManager.instance.relocateSSTables(this);
+ }
+
public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType)
{
assert !sstables.isEmpty();
@@ -2206,7 +2291,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public int getUnleveledSSTables()
{
- return this.compactionStrategyManager.getUnleveledSSTables();
+ return compactionStrategyManager.getUnleveledSSTables();
}
public int[] getSSTableCountPerLevel()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 8744d43..a572bed 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.config.*;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.FSError;
import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -289,6 +290,19 @@ public class Directories
return null;
}
+ public DataDirectory getDataDirectoryForFile(File directory)
+ {
+ if (directory != null)
+ {
+ for (DataDirectory dataDirectory : dataDirectories)
+ {
+ if (directory.getAbsolutePath().startsWith(dataDirectory.location.getAbsolutePath()))
+ return dataDirectory;
+ }
+ }
+ return null;
+ }
+
public Descriptor find(String filename)
{
for (File dir : dataPaths)
@@ -403,7 +417,7 @@ public class Directories
for (DataDirectory dataDir : paths)
{
if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
- continue;
+ continue;
DataDirectoryCandidate candidate = new DataDirectoryCandidate(dataDir);
// exclude directory if its total writeSize does not fit to data directory
if (candidate.availableSpace < writeSize)
@@ -413,6 +427,26 @@ public class Directories
return totalAvailable > expectedTotalWriteSize;
}
+ public DataDirectory[] getWriteableLocations()
+ {
+ List<DataDirectory> nonBlacklistedDirs = new ArrayList<>();
+ for (DataDirectory dir : dataDirectories)
+ {
+ if (!BlacklistedDirectories.isUnwritable(dir.location))
+ nonBlacklistedDirs.add(dir);
+ }
+
+ Collections.sort(nonBlacklistedDirs, new Comparator<DataDirectory>()
+ {
+ @Override
+ public int compare(DataDirectory o1, DataDirectory o2)
+ {
+ return o1.location.compareTo(o2.location);
+ }
+ });
+ return nonBlacklistedDirs.toArray(new DataDirectory[nonBlacklistedDirs.size()]);
+ }
+
public static File getSnapshotDirectory(Descriptor desc, String snapshotName)
{
return getSnapshotDirectory(desc.directory, snapshotName);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 96b1775..8e7a43c 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db;
-import java.io.File;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,14 +44,14 @@ import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
import org.apache.cassandra.index.transactions.UpdateTransaction;
import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.SSTableTxnWriter;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.io.util.DiskAwareRunnable;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.memory.MemtableAllocator;
import org.apache.cassandra.utils.memory.MemtablePool;
@@ -250,9 +249,32 @@ public class Memtable implements Comparable<Memtable>
return partitions.size();
}
- public FlushRunnable flushRunnable()
+ public List<FlushRunnable> flushRunnables(LifecycleTransaction txn)
{
- return new FlushRunnable(lastReplayPosition.get());
+ List<Range<Token>> localRanges = Range.sort(StorageService.instance.getLocalRanges(cfs.keyspace.getName()));
+
+ if (!cfs.getPartitioner().splitter().isPresent() || localRanges.isEmpty())
+ return Collections.singletonList(new FlushRunnable(lastReplayPosition.get(), txn));
+
+ return createFlushRunnables(localRanges, txn);
+ }
+
+ private List<FlushRunnable> createFlushRunnables(List<Range<Token>> localRanges, LifecycleTransaction txn)
+ {
+ assert cfs.getPartitioner().splitter().isPresent();
+
+ Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
+ List<PartitionPosition> boundaries = StorageService.getDiskBoundaries(localRanges, cfs.getPartitioner(), locations);
+ List<FlushRunnable> runnables = new ArrayList<>(boundaries.size());
+ PartitionPosition rangeStart = cfs.getPartitioner().getMinimumToken().minKeyBound();
+ ReplayPosition context = lastReplayPosition.get();
+ for (int i = 0; i < boundaries.size(); i++)
+ {
+ PartitionPosition t = boundaries.get(i);
+ runnables.add(new FlushRunnable(context, rangeStart, t, locations[i], txn));
+ rangeStart = t;
+ }
+ return runnables;
}
public String toString()
@@ -312,23 +334,41 @@ public class Memtable implements Comparable<Memtable>
return creationTime;
}
- class FlushRunnable extends DiskAwareRunnable
+ class FlushRunnable implements Callable<SSTableMultiWriter>
{
- private final ReplayPosition context;
+ public final ReplayPosition context;
private final long estimatedSize;
+ private final ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush;
private final boolean isBatchLogTable;
+ private final SSTableMultiWriter writer;
+
+ // keeping these to be able to log what we are actually flushing
+ private final PartitionPosition from;
+ private final PartitionPosition to;
- FlushRunnable(ReplayPosition context)
+ FlushRunnable(ReplayPosition context, PartitionPosition from, PartitionPosition to, Directories.DataDirectory flushLocation, LifecycleTransaction txn)
{
- this.context = context;
+ this(context, partitions.subMap(from, to), flushLocation, from, to, txn);
+ }
+
+ FlushRunnable(ReplayPosition context, LifecycleTransaction txn)
+ {
+ this(context, partitions, null, null, null, txn);
+ }
+ FlushRunnable(ReplayPosition context, ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush, Directories.DataDirectory flushLocation, PartitionPosition from, PartitionPosition to, LifecycleTransaction txn)
+ {
+ this.context = context;
+ this.toFlush = toFlush;
+ this.from = from;
+ this.to = to;
long keySize = 0;
- for (PartitionPosition key : partitions.keySet())
+ for (PartitionPosition key : toFlush.keySet())
{
// make sure we don't write non-sensical keys
assert key instanceof DecoratedKey;
- keySize += ((DecoratedKey)key).getKey().remaining();
+ keySize += ((DecoratedKey) key).getKey().remaining();
}
estimatedSize = (long) ((keySize // index entries
+ keySize // keys in data file
@@ -336,21 +376,12 @@ public class Memtable implements Comparable<Memtable>
* 1.2); // bloom filter and row index overhead
this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
- }
- public long getExpectedWriteSize()
- {
- return estimatedSize;
- }
+ if (flushLocation == null)
+ writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getLocationForDisk(getDirectories().getWriteableLocation(estimatedSize))), columnsCollector.get(), statsCollector.get());
+ else
+ writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getLocationForDisk(flushLocation)), columnsCollector.get(), statsCollector.get());
- protected void runMayThrow() throws Exception
- {
- long writeSize = getExpectedWriteSize();
- Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
- File sstableDirectory = cfs.getDirectories().getLocationForDisk(dataDirectory);
- assert sstableDirectory != null : "Flush task is not bound to any disk";
- Collection<SSTableReader> sstables = writeSortedContents(context, sstableDirectory);
- cfs.replaceFlushed(Memtable.this, sstables);
}
protected Directories getDirectories()
@@ -358,90 +389,64 @@ public class Memtable implements Comparable<Memtable>
return cfs.getDirectories();
}
- private Collection<SSTableReader> writeSortedContents(ReplayPosition context, File sstableDirectory)
+ private void writeSortedContents(ReplayPosition context)
{
- logger.debug("Writing {}", Memtable.this.toString());
+ logger.debug("Writing {}, flushed range = ({}, {}]", Memtable.this.toString(), from, to);
- Collection<SSTableReader> ssTables;
- try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
+ boolean trackContention = logger.isTraceEnabled();
+ int heavilyContendedRowCount = 0;
+ // (we can't clear out the map as-we-go to free up memory,
+ // since the memtable is being used for queries in the "pending flush" category)
+ for (AtomicBTreePartition partition : toFlush.values())
{
- boolean trackContention = logger.isTraceEnabled();
- int heavilyContendedRowCount = 0;
- // (we can't clear out the map as-we-go to free up memory,
- // since the memtable is being used for queries in the "pending flush" category)
- for (AtomicBTreePartition partition : partitions.values())
+ // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
+ // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
+ // we don't need to preserve tombstones for repair. So if both operation are in this
+ // memtable (which will almost always be the case if there is no ongoing failure), we can
+ // just skip the entry (CASSANDRA-4667).
+ if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
+ continue;
+
+ if (trackContention && partition.usePessimisticLocking())
+ heavilyContendedRowCount++;
+
+ if (!partition.isEmpty())
{
- // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
- // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
- // we don't need to preserve tombstones for repair. So if both operation are in this
- // memtable (which will almost always be the case if there is no ongoing failure), we can
- // just skip the entry (CASSANDRA-4667).
- if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
- continue;
-
- if (trackContention && partition.usePessimisticLocking())
- heavilyContendedRowCount++;
-
- if (!partition.isEmpty())
+ try (UnfilteredRowIterator iter = partition.unfilteredIterator())
{
- try (UnfilteredRowIterator iter = partition.unfilteredIterator())
- {
- writer.append(iter);
- }
+ writer.append(iter);
}
}
+ }
- if (writer.getFilePointer() > 0)
- {
- logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
- writer.getFilename(),
- FBUtilities.prettyPrintMemory(writer.getFilePointer()),
- context));
+ logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
+ writer.getFilename(),
+ FBUtilities.prettyPrintMemory(writer.getFilePointer()),
+ context));
- // sstables should contain non-repaired data.
- ssTables = writer.finish(true);
- }
- else
- {
- logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}",
- writer.getFilename(), context);
- writer.abort();
- ssTables = null;
- }
+ if (heavilyContendedRowCount > 0)
+ logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, toFlush.size(), Memtable.this.toString()));
+ }
- if (heavilyContendedRowCount > 0)
- logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+ public SSTableMultiWriter createFlushWriter(LifecycleTransaction txn,
+ String filename,
+ PartitionColumns columns,
+ EncodingStats stats)
+ {
+ MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context);
+ return cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
+ (long)toFlush.size(),
+ ActiveRepairService.UNREPAIRED_SSTABLE,
+ sstableMetadataCollector,
+ new SerializationHeader(true, cfs.metadata, columns, stats), txn);
- return ssTables;
- }
}
- @SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter
- public SSTableTxnWriter createFlushWriter(String filename,
- PartitionColumns columns,
- EncodingStats stats)
+ @Override
+ public SSTableMultiWriter call()
{
- // we operate "offline" here, as we expose the resulting reader consciously when done
- // (although we may want to modify this behaviour in future, to encapsulate full flush behaviour in LifecycleTransaction)
- LifecycleTransaction txn = null;
- try
- {
- txn = LifecycleTransaction.offline(OperationType.FLUSH);
- MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context);
- return new SSTableTxnWriter(txn,
- cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
- (long) partitions.size(),
- ActiveRepairService.UNREPAIRED_SSTABLE,
- sstableMetadataCollector,
- new SerializationHeader(true, cfs.metadata, columns, stats),
- txn));
- }
- catch (Throwable t)
- {
- if (txn != null)
- txn.close();
- throw t;
- }
+ writeSortedContents(context);
+ return writer;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 571088b..1f39767 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.concurrent.*;
+import java.util.stream.Collectors;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.openmbean.OpenDataException;
@@ -270,7 +271,7 @@ public class CompactionManager implements CompactionManagerMBean
Iterable<SSTableReader> sstables = Lists.newArrayList(operation.filterSSTables(compacting));
if (Iterables.isEmpty(sstables))
{
- logger.info("No sstables for {}.{}", cfs.keyspace.getName(), cfs.name);
+ logger.info("No sstables to {} for {}.{}", operationType.name(), cfs.keyspace.getName(), cfs.name);
return AllSSTableOpStatus.SUCCESSFUL;
}
@@ -432,6 +433,77 @@ public class CompactionManager implements CompactionManagerMBean
}, OperationType.CLEANUP);
}
+ public AllSSTableOpStatus relocateSSTables(final ColumnFamilyStore cfs) throws ExecutionException, InterruptedException
+ {
+ if (!cfs.getPartitioner().splitter().isPresent())
+ {
+ logger.info("Partitioner does not support splitting");
+ return AllSSTableOpStatus.ABORTED;
+ }
+ final Collection<Range<Token>> r = StorageService.instance.getLocalRanges(cfs.keyspace.getName());
+
+ if (r.isEmpty())
+ {
+ logger.info("Relocate cannot run before a node has joined the ring");
+ return AllSSTableOpStatus.ABORTED;
+ }
+
+ final List<Range<Token>> localRanges = Range.sort(r);
+ final Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
+ final List<PartitionPosition> diskBoundaries = StorageService.getDiskBoundaries(localRanges, cfs.getPartitioner(), locations);
+
+ return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
+ {
+ @Override
+ public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
+ {
+ Set<SSTableReader> originals = Sets.newHashSet(transaction.originals());
+ Set<SSTableReader> needsRelocation = originals.stream().filter(s -> !inCorrectLocation(s)).collect(Collectors.toSet());
+ transaction.cancel(Sets.difference(originals, needsRelocation));
+
+ Map<Integer, List<SSTableReader>> groupedByDisk = needsRelocation.stream().collect(Collectors.groupingBy((s) ->
+ CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), s)));
+
+ int maxSize = 0;
+ for (List<SSTableReader> diskSSTables : groupedByDisk.values())
+ maxSize = Math.max(maxSize, diskSSTables.size());
+
+ List<SSTableReader> mixedSSTables = new ArrayList<>();
+
+ for (int i = 0; i < maxSize; i++)
+ for (List<SSTableReader> diskSSTables : groupedByDisk.values())
+ if (i < diskSSTables.size())
+ mixedSSTables.add(diskSSTables.get(i));
+
+ return mixedSSTables;
+ }
+
+ private boolean inCorrectLocation(SSTableReader sstable)
+ {
+ if (!cfs.getPartitioner().splitter().isPresent())
+ return true;
+ int directoryIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), sstable);
+ Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
+
+ Directories.DataDirectory location = locations[directoryIndex];
+ PartitionPosition diskLast = diskBoundaries.get(directoryIndex);
+ // the location we get from directoryIndex is based on the first key in the sstable
+ // now we need to make sure the last key is less than the boundary as well:
+ return sstable.descriptor.directory.getAbsolutePath().startsWith(location.location.getAbsolutePath()) && sstable.last.compareTo(diskLast) <= 0;
+ }
+
+ @Override
+ public void execute(LifecycleTransaction txn) throws IOException
+ {
+ logger.debug("Relocating {}", txn.originals());
+ AbstractCompactionTask task = cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, Long.MAX_VALUE);
+ task.setUserDefined(true);
+ task.setCompactionType(OperationType.RELOCATE);
+ task.execute(metrics);
+ }
+ }, OperationType.RELOCATE);
+ }
+
public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs,
final Collection<Range<Token>> ranges,
final Refs<SSTableReader> sstables,
@@ -878,9 +950,7 @@ public class CompactionManager implements CompactionManagerMBean
logger.info("Cleaning up {}", sstable);
- File compactionFileLocation = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(txn.originals(), OperationType.CLEANUP));
- if (compactionFileLocation == null)
- throw new IOException("disk full");
+ File compactionFileLocation = sstable.descriptor.directory;
List<SSTableReader> finished;
int nowInSec = FBUtilities.nowInSeconds();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 7c7e86a..067a0c1 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -20,9 +20,12 @@ package org.apache.cassandra.db.compaction;
import java.util.*;
import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
import com.google.common.collect.Iterables;
import org.apache.cassandra.index.Index;
+import com.google.common.primitives.Ints;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,6 +34,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.dht.Range;
@@ -43,19 +47,21 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.notifications.*;
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.StorageService;
/**
* Manages the compaction strategies.
*
- * Currently has two instances of actual compaction strategies - one for repaired data and one for
+ * Currently has two instances of actual compaction strategies per data directory - one for repaired data and one for
* unrepaired data. This is done to be able to totally separate the different sets of sstables.
*/
+
public class CompactionStrategyManager implements INotificationConsumer
{
private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
private final ColumnFamilyStore cfs;
- private volatile AbstractCompactionStrategy repaired;
- private volatile AbstractCompactionStrategy unrepaired;
+ private volatile List<AbstractCompactionStrategy> repaired = new ArrayList<>();
+ private volatile List<AbstractCompactionStrategy> unrepaired = new ArrayList<>();
private volatile boolean enabled = true;
public boolean isActive = true;
private volatile CompactionParams params;
@@ -67,6 +73,7 @@ public class CompactionStrategyManager implements INotificationConsumer
we will use the new compaction parameters.
*/
private CompactionParams schemaCompactionParams;
+ private Directories.DataDirectory[] locations;
public CompactionStrategyManager(ColumnFamilyStore cfs)
{
@@ -75,6 +82,7 @@ public class CompactionStrategyManager implements INotificationConsumer
this.cfs = cfs;
reload(cfs.metadata);
params = cfs.metadata.params.compaction;
+ locations = getDirectories().getWriteableLocations();
enabled = params.isEnabled();
}
@@ -91,20 +99,17 @@ public class CompactionStrategyManager implements INotificationConsumer
maybeReload(cfs.metadata);
- if (repaired.getEstimatedRemainingTasks() > unrepaired.getEstimatedRemainingTasks())
- {
- AbstractCompactionTask repairedTask = repaired.getNextBackgroundTask(gcBefore);
- if (repairedTask != null)
- return repairedTask;
- return unrepaired.getNextBackgroundTask(gcBefore);
- }
- else
+ List<AbstractCompactionStrategy> strategies = new ArrayList<>(repaired.size() + unrepaired.size());
+ strategies.addAll(repaired);
+ strategies.addAll(unrepaired);
+ Collections.sort(strategies, (o1, o2) -> Ints.compare(o2.getEstimatedRemainingTasks(), o1.getEstimatedRemainingTasks()));
+ for (AbstractCompactionStrategy strategy : strategies)
{
- AbstractCompactionTask unrepairedTask = unrepaired.getNextBackgroundTask(gcBefore);
- if (unrepairedTask != null)
- return unrepairedTask;
- return repaired.getNextBackgroundTask(gcBefore);
+ AbstractCompactionTask task = strategy.getNextBackgroundTask(gcBefore);
+ if (task != null)
+ return task;
}
+ return null;
}
public boolean isEnabled()
@@ -135,36 +140,78 @@ public class CompactionStrategyManager implements INotificationConsumer
if (sstable.openReason != SSTableReader.OpenReason.EARLY)
getCompactionStrategyFor(sstable).addSSTable(sstable);
}
- repaired.startup();
- unrepaired.startup();
+ repaired.forEach(AbstractCompactionStrategy::startup);
+ unrepaired.forEach(AbstractCompactionStrategy::startup);
}
/**
* return the compaction strategy for the given sstable
*
- * returns differently based on the repaired status
+ * returns differently based on the repaired status and which vnode the compaction strategy belongs to
* @param sstable
* @return
*/
private AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
{
+ int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
if (sstable.isRepaired())
- return repaired;
+ return repaired.get(index);
else
- return unrepaired;
+ return unrepaired.get(index);
+ }
+
+ /**
+ * Get the correct compaction strategy for the given sstable. If the first token starts within a disk boundary, we
+ * will add it to that compaction strategy.
+ *
+ * In the case we are upgrading, the first compaction strategy will get most files - we do not care about which disk
+ * the sstable is on currently (unless we don't know the local tokens yet). Once we start compacting we will write out
+ * sstables in the correct locations and give them to the correct compaction strategy instance.
+ *
+ * @param cfs
+ * @param locations
+ * @param sstable
+ * @return
+ */
+ public static int getCompactionStrategyIndex(ColumnFamilyStore cfs, Directories locations, SSTableReader sstable)
+ {
+ if (!cfs.getPartitioner().splitter().isPresent())
+ return 0;
+
+ Directories.DataDirectory[] directories = locations.getWriteableLocations();
+
+ List<PartitionPosition> boundaries = StorageService.getDiskBoundaries(cfs, locations.getWriteableLocations());
+ if (boundaries == null)
+ {
+ // try to figure out location based on sstable directory:
+ for (int i = 0; i < directories.length; i++)
+ {
+ Directories.DataDirectory directory = directories[i];
+ if (sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
+ return i;
+ }
+ return 0;
+ }
+
+ int pos = Collections.binarySearch(boundaries, sstable.first);
+ assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal
+ return -pos - 1;
+
+
}
public void shutdown()
{
isActive = false;
- repaired.shutdown();
- unrepaired.shutdown();
+ repaired.forEach(AbstractCompactionStrategy::shutdown);
+ unrepaired.forEach(AbstractCompactionStrategy::shutdown);
}
public synchronized void maybeReload(CFMetaData metadata)
{
// compare the old schema configuration to the new one, ignore any locally set changes.
- if (metadata.params.compaction.equals(schemaCompactionParams))
+ if (metadata.params.compaction.equals(schemaCompactionParams) &&
+ Arrays.equals(locations, cfs.getDirectories().getWriteableLocations())) // any drives broken?
return;
reload(metadata);
}
@@ -178,6 +225,11 @@ public class CompactionStrategyManager implements INotificationConsumer
public synchronized void reload(CFMetaData metadata)
{
boolean disabledWithJMX = !enabled && shouldBeEnabled();
+ if (!metadata.params.compaction.equals(schemaCompactionParams))
+ logger.trace("Recreating compaction strategy - compaction parameters changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
+ else if (!Arrays.equals(locations, cfs.getDirectories().getWriteableLocations()))
+ logger.trace("Recreating compaction strategy - writeable locations changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
+
setStrategy(metadata.params.compaction);
schemaCompactionParams = metadata.params.compaction;
@@ -197,11 +249,13 @@ public class CompactionStrategyManager implements INotificationConsumer
public int getUnleveledSSTables()
{
- if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy)
+ if (repaired.get(0) instanceof LeveledCompactionStrategy && unrepaired.get(0) instanceof LeveledCompactionStrategy)
{
int count = 0;
- count += ((LeveledCompactionStrategy)repaired).getLevelSize(0);
- count += ((LeveledCompactionStrategy)unrepaired).getLevelSize(0);
+ for (AbstractCompactionStrategy strategy : repaired)
+ count += ((LeveledCompactionStrategy)strategy).getLevelSize(0);
+ for (AbstractCompactionStrategy strategy : unrepaired)
+ count += ((LeveledCompactionStrategy)strategy).getLevelSize(0);
return count;
}
return 0;
@@ -209,13 +263,19 @@ public class CompactionStrategyManager implements INotificationConsumer
public synchronized int[] getSSTableCountPerLevel()
{
- if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy)
+ if (repaired.get(0) instanceof LeveledCompactionStrategy && unrepaired.get(0) instanceof LeveledCompactionStrategy)
{
int [] res = new int[LeveledManifest.MAX_LEVEL_COUNT];
- int[] repairedCountPerLevel = ((LeveledCompactionStrategy) repaired).getAllLevelSize();
- res = sumArrays(res, repairedCountPerLevel);
- int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) unrepaired).getAllLevelSize();
- res = sumArrays(res, unrepairedCountPerLevel);
+ for (AbstractCompactionStrategy strategy : repaired)
+ {
+ int[] repairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize();
+ res = sumArrays(res, repairedCountPerLevel);
+ }
+ for (AbstractCompactionStrategy strategy : unrepaired)
+ {
+ int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize();
+ res = sumArrays(res, unrepairedCountPerLevel);
+ }
return res;
}
return null;
@@ -238,103 +298,112 @@ public class CompactionStrategyManager implements INotificationConsumer
public boolean shouldDefragment()
{
- assert repaired.getClass().equals(unrepaired.getClass());
- return repaired.shouldDefragment();
+ assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass());
+ return repaired.get(0).shouldDefragment();
}
public Directories getDirectories()
{
- assert repaired.getClass().equals(unrepaired.getClass());
- return repaired.getDirectories();
+ assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass());
+ return repaired.get(0).getDirectories();
}
public synchronized void handleNotification(INotification notification, Object sender)
{
+ maybeReload(cfs.metadata);
if (notification instanceof SSTableAddedNotification)
{
SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
for (SSTableReader sstable : flushedNotification.added)
- {
- if (sstable.isRepaired())
- repaired.addSSTable(sstable);
- else
- unrepaired.addSSTable(sstable);
- }
+ getCompactionStrategyFor(sstable).addSSTable(sstable);
}
else if (notification instanceof SSTableListChangedNotification)
{
+ // a bit of gymnastics to be able to replace sstables in compaction strategies
+ // we use this to know that a compaction finished and where to start the next compaction in LCS
SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
- Set<SSTableReader> repairedRemoved = new HashSet<>();
- Set<SSTableReader> repairedAdded = new HashSet<>();
- Set<SSTableReader> unrepairedRemoved = new HashSet<>();
- Set<SSTableReader> unrepairedAdded = new HashSet<>();
+
+ Directories.DataDirectory [] locations = cfs.getDirectories().getWriteableLocations();
+ int locationSize = cfs.getPartitioner().splitter().isPresent() ? locations.length : 1;
+
+ List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize);
+ List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
+ List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize);
+ List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize);
+
+ for (int i = 0; i < locationSize; i++)
+ {
+ repairedRemoved.add(new HashSet<>());
+ repairedAdded.add(new HashSet<>());
+ unrepairedRemoved.add(new HashSet<>());
+ unrepairedAdded.add(new HashSet<>());
+ }
for (SSTableReader sstable : listChangedNotification.removed)
{
+ int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
if (sstable.isRepaired())
- repairedRemoved.add(sstable);
+ repairedRemoved.get(i).add(sstable);
else
- unrepairedRemoved.add(sstable);
+ unrepairedRemoved.get(i).add(sstable);
}
for (SSTableReader sstable : listChangedNotification.added)
{
+ int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
if (sstable.isRepaired())
- repairedAdded.add(sstable);
+ repairedAdded.get(i).add(sstable);
else
- unrepairedAdded.add(sstable);
- }
- if (!repairedRemoved.isEmpty())
- {
- repaired.replaceSSTables(repairedRemoved, repairedAdded);
- }
- else
- {
- for (SSTableReader sstable : repairedAdded)
- repaired.addSSTable(sstable);
+ unrepairedAdded.get(i).add(sstable);
}
- if (!unrepairedRemoved.isEmpty())
+ for (int i = 0; i < locationSize; i++)
{
- unrepaired.replaceSSTables(unrepairedRemoved, unrepairedAdded);
- }
- else
- {
- for (SSTableReader sstable : unrepairedAdded)
- unrepaired.addSSTable(sstable);
+ if (!repairedRemoved.get(i).isEmpty())
+ repaired.get(i).replaceSSTables(repairedRemoved.get(i), repairedAdded.get(i));
+ else
+ {
+ for (SSTableReader sstable : repairedAdded.get(i))
+ repaired.get(i).addSSTable(sstable);
+ }
+ if (!unrepairedRemoved.get(i).isEmpty())
+ unrepaired.get(i).replaceSSTables(unrepairedRemoved.get(i), unrepairedAdded.get(i));
+ else
+ {
+ for (SSTableReader sstable : unrepairedAdded.get(i))
+ unrepaired.get(i).addSSTable(sstable);
+ }
}
}
else if (notification instanceof SSTableRepairStatusChanged)
{
for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable)
{
+ int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
if (sstable.isRepaired())
{
- unrepaired.removeSSTable(sstable);
- repaired.addSSTable(sstable);
+ unrepaired.get(index).removeSSTable(sstable);
+ repaired.get(index).addSSTable(sstable);
}
else
{
- repaired.removeSSTable(sstable);
- unrepaired.addSSTable(sstable);
+ repaired.get(index).removeSSTable(sstable);
+ unrepaired.get(index).addSSTable(sstable);
}
}
}
else if (notification instanceof SSTableDeletingNotification)
{
- SSTableReader sstable = ((SSTableDeletingNotification)notification).deleting;
- if (sstable.isRepaired())
- repaired.removeSSTable(sstable);
- else
- unrepaired.removeSSTable(sstable);
+ SSTableReader sstable = ((SSTableDeletingNotification) notification).deleting;
+ getCompactionStrategyFor(sstable).removeSSTable(sstable);
}
}
public void enable()
{
if (repaired != null)
- repaired.enable();
+ repaired.forEach(AbstractCompactionStrategy::enable);
if (unrepaired != null)
- unrepaired.enable();
+ unrepaired.forEach(AbstractCompactionStrategy::enable);
// enable this last to make sure the strategies are ready to get calls.
enabled = true;
}
@@ -344,47 +413,64 @@ public class CompactionStrategyManager implements INotificationConsumer
// disable this first avoid asking disabled strategies for compaction tasks
enabled = false;
if (repaired != null)
- repaired.disable();
+ repaired.forEach(AbstractCompactionStrategy::disable);
if (unrepaired != null)
- unrepaired.disable();
+ unrepaired.forEach(AbstractCompactionStrategy::disable);
}
/**
- * Create ISSTableScanner from the given sstables
+ * Create ISSTableScanners from the given sstables
*
* Delegates the call to the compaction strategies to allow LCS to create a scanner
* @param sstables
- * @param range
+ * @param ranges
* @return
*/
@SuppressWarnings("resource")
public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges)
{
- List<SSTableReader> repairedSSTables = new ArrayList<>();
- List<SSTableReader> unrepairedSSTables = new ArrayList<>();
+ assert repaired.size() == unrepaired.size();
+ List<Set<SSTableReader>> repairedSSTables = new ArrayList<>();
+ List<Set<SSTableReader>> unrepairedSSTables = new ArrayList<>();
+
+ for (int i = 0; i < repaired.size(); i++)
+ {
+ repairedSSTables.add(new HashSet<>());
+ unrepairedSSTables.add(new HashSet<>());
+ }
+
for (SSTableReader sstable : sstables)
{
if (sstable.isRepaired())
- repairedSSTables.add(sstable);
+ repairedSSTables.get(getCompactionStrategyIndex(cfs, getDirectories(), sstable)).add(sstable);
else
- unrepairedSSTables.add(sstable);
+ unrepairedSSTables.get(getCompactionStrategyIndex(cfs, getDirectories(), sstable)).add(sstable);
}
- Set<ISSTableScanner> scanners = new HashSet<>(sstables.size());
+ List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
for (Range<Token> range : ranges)
{
- AbstractCompactionStrategy.ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range);
- AbstractCompactionStrategy.ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range);
+ List<ISSTableScanner> repairedScanners = new ArrayList<>();
+ List<ISSTableScanner> unrepairedScanners = new ArrayList<>();
- for (ISSTableScanner scanner : Iterables.concat(repairedScanners.scanners, unrepairedScanners.scanners))
+ for (int i = 0; i < repairedSSTables.size(); i++)
+ {
+ if (!repairedSSTables.get(i).isEmpty())
+ repairedScanners.addAll(repaired.get(i).getScanners(repairedSSTables.get(i), range).scanners);
+ }
+ for (int i = 0; i < unrepairedSSTables.size(); i++)
+ {
+ if (!unrepairedSSTables.get(i).isEmpty())
+ scanners.addAll(unrepaired.get(i).getScanners(unrepairedSSTables.get(i), range).scanners);
+ }
+ for (ISSTableScanner scanner : Iterables.concat(repairedScanners, unrepairedScanners))
{
if (!scanners.add(scanner))
scanner.close();
}
}
-
- return new AbstractCompactionStrategy.ScannerList(new ArrayList<>(scanners));
+ return new AbstractCompactionStrategy.ScannerList(scanners);
}
public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables)
@@ -394,21 +480,44 @@ public class CompactionStrategyManager implements INotificationConsumer
public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
{
- return unrepaired.groupSSTablesForAntiCompaction(sstablesToGroup);
+ Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+ Collection<Collection<SSTableReader>> anticompactionGroups = new ArrayList<>();
+
+ for (Map.Entry<Integer, List<SSTableReader>> group : groups.entrySet())
+ anticompactionGroups.addAll(unrepaired.get(group.getKey()).groupSSTablesForAntiCompaction(group.getValue()));
+ return anticompactionGroups;
}
public long getMaxSSTableBytes()
{
- return unrepaired.getMaxSSTableBytes();
+ return unrepaired.get(0).getMaxSSTableBytes();
}
public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
{
+ maybeReload(cfs.metadata);
+ validateForCompaction(txn.originals());
return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
}
+ private void validateForCompaction(Iterable<SSTableReader> input)
+ {
+ SSTableReader firstSSTable = Iterables.getFirst(input, null);
+ assert firstSSTable != null;
+ boolean repaired = firstSSTable.isRepaired();
+ int firstIndex = getCompactionStrategyIndex(cfs, getDirectories(), firstSSTable);
+ for (SSTableReader sstable : input)
+ {
+ if (sstable.isRepaired() != repaired)
+ throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
+ if (firstIndex != getCompactionStrategyIndex(cfs, getDirectories(), sstable))
+ throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
+ }
+ }
+
public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore, final boolean splitOutput)
{
+ maybeReload(cfs.metadata);
// runWithCompactionsDisabled cancels active compactions and disables them, then we are able
// to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the
// sstables are marked the compactions are re-enabled
@@ -419,20 +528,21 @@ public class CompactionStrategyManager implements INotificationConsumer
{
synchronized (CompactionStrategyManager.this)
{
- Collection<AbstractCompactionTask> repairedTasks = repaired.getMaximalTask(gcBefore, splitOutput);
- Collection<AbstractCompactionTask> unrepairedTasks = unrepaired.getMaximalTask(gcBefore, splitOutput);
-
- if (repairedTasks == null && unrepairedTasks == null)
- return null;
-
- if (repairedTasks == null)
- return unrepairedTasks;
- if (unrepairedTasks == null)
- return repairedTasks;
-
List<AbstractCompactionTask> tasks = new ArrayList<>();
- tasks.addAll(repairedTasks);
- tasks.addAll(unrepairedTasks);
+ for (AbstractCompactionStrategy strategy : repaired)
+ {
+ Collection<AbstractCompactionTask> task = strategy.getMaximalTask(gcBefore, splitOutput);
+ if (task != null)
+ tasks.addAll(task);
+ }
+ for (AbstractCompactionStrategy strategy : unrepaired)
+ {
+ Collection<AbstractCompactionTask> task = strategy.getMaximalTask(gcBefore, splitOutput);
+ if (task != null)
+ tasks.addAll(task);
+ }
+ if (tasks.isEmpty())
+ return null;
return tasks;
}
}
@@ -441,14 +551,18 @@ public class CompactionStrategyManager implements INotificationConsumer
public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
{
+ maybeReload(cfs.metadata);
+ validateForCompaction(sstables);
return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore);
}
public int getEstimatedRemainingTasks()
{
int tasks = 0;
- tasks += repaired.getEstimatedRemainingTasks();
- tasks += unrepaired.getEstimatedRemainingTasks();
+ for (AbstractCompactionStrategy strategy : repaired)
+ tasks += strategy.getEstimatedRemainingTasks();
+ for (AbstractCompactionStrategy strategy : unrepaired)
+ tasks += strategy.getEstimatedRemainingTasks();
return tasks;
}
@@ -460,10 +574,10 @@ public class CompactionStrategyManager implements INotificationConsumer
public String getName()
{
- return unrepaired.getName();
+ return unrepaired.get(0).getName();
}
- public List<AbstractCompactionStrategy> getStrategies()
+ public List<List<AbstractCompactionStrategy>> getStrategies()
{
return Arrays.asList(repaired, unrepaired);
}
@@ -481,12 +595,25 @@ public class CompactionStrategyManager implements INotificationConsumer
private void setStrategy(CompactionParams params)
{
- if (repaired != null)
- repaired.shutdown();
- if (unrepaired != null)
- unrepaired.shutdown();
- repaired = CFMetaData.createCompactionStrategyInstance(cfs, params);
- unrepaired = CFMetaData.createCompactionStrategyInstance(cfs, params);
+ repaired.forEach(AbstractCompactionStrategy::shutdown);
+ unrepaired.forEach(AbstractCompactionStrategy::shutdown);
+ repaired.clear();
+ unrepaired.clear();
+
+ if (cfs.getPartitioner().splitter().isPresent())
+ {
+ locations = cfs.getDirectories().getWriteableLocations();
+ for (int i = 0; i < locations.length; i++)
+ {
+ repaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
+ unrepaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
+ }
+ }
+ else
+ {
+ repaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
+ unrepaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
+ }
this.params = params;
}
@@ -510,11 +637,11 @@ public class CompactionStrategyManager implements INotificationConsumer
{
if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
{
- return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
+ return unrepaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
}
else
{
- return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
+ return repaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index be81c80..6b9fe21 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -230,7 +230,7 @@ public class CompactionTask extends AbstractCompactionTask
LifecycleTransaction transaction,
Set<SSTableReader> nonExpiredSSTables)
{
- return new DefaultCompactionWriter(cfs, directories, transaction, nonExpiredSSTables, offline, keepOriginals);
+ return new DefaultCompactionWriter(cfs, directories, transaction, nonExpiredSSTables, offline, keepOriginals, getLevel());
}
public static String updateCompactionHistory(String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 50f9b71..2dc6ee8 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -85,7 +85,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
*/
private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
{
- if (Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE)))
+ if (sstables.isEmpty())
return Collections.emptyList();
Set<SSTableReader> uncompacting = ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(), sstables::contains));
@@ -212,6 +212,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
{
sstables.remove(sstable);
}
+
/**
* A target time span used for bucketing SSTables based on timestamps.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index b7bf83f..c54b751 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -355,7 +355,7 @@ public class LeveledManifest
Collection<SSTableReader> candidates = getCandidatesFor(0);
if (candidates.isEmpty())
return null;
- return new CompactionCandidate(candidates, getNextLevel(candidates), cfs.getCompactionStrategyManager().getMaxSSTableBytes());
+ return new CompactionCandidate(candidates, getNextLevel(candidates), maxSSTableSizeInBytes);
}
private List<SSTableReader> getSSTablesForSTCS(Collection<SSTableReader> sstables)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index 20e6df2..84a34c9 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -37,7 +37,8 @@ public enum OperationType
STREAM("Stream"),
WRITE("Write"),
VIEW_BUILD("View build"),
- INDEX_SUMMARY("Index summary redistribution");
+ INDEX_SUMMARY("Index summary redistribution"),
+ RELOCATE("Relocate sstables to correct disk");
public final String type;
public final String fileName;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 272c2f8..838b0a1 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -104,11 +104,8 @@ public class Scrubber implements Closeable
List<SSTableReader> toScrub = Collections.singletonList(sstable);
- // Calculate the expected compacted filesize
- this.destination = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(toScrub, OperationType.SCRUB));
- if (destination == null)
- throw new IOException("disk full");
-
+ int locIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), sstable);
+ this.destination = cfs.getDirectories().getLocationForDisk(cfs.getDirectories().getWriteableLocations()[locIndex]);
this.isCommutative = cfs.metadata.isCounter();
boolean hasIndexFile = (new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX))).exists();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index f8a8240..e36adf2 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -21,6 +21,7 @@ import java.util.*;
import java.util.Map.Entry;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,7 +85,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(candidates), sizeTieredOptions.bucketHigh, sizeTieredOptions.bucketLow, sizeTieredOptions.minSSTableSize);
logger.trace("Compaction buckets are {}", buckets);
- updateEstimatedCompactionsByTasks(buckets);
+ estimatedRemainingTasks = getEstimatedCompactionsByTasks(cfs, buckets);
List<SSTableReader> mostInteresting = mostInterestingBucket(buckets, minThreshold, maxThreshold);
if (!mostInteresting.isEmpty())
return mostInteresting;
@@ -282,15 +283,15 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
return new ArrayList<List<T>>(buckets.values());
}
- private void updateEstimatedCompactionsByTasks(List<List<SSTableReader>> tasks)
+ public static int getEstimatedCompactionsByTasks(ColumnFamilyStore cfs, List<List<SSTableReader>> tasks)
{
int n = 0;
- for (List<SSTableReader> bucket: tasks)
+ for (List<SSTableReader> bucket : tasks)
{
if (bucket.size() >= cfs.getMinimumCompactionThreshold())
n += Math.ceil((double)bucket.size() / cfs.getMaximumCompactionThreshold());
}
- estimatedRemainingTasks = n;
+ return n;
}
public long getMaxSSTableBytes()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index 0b3b7d0..46023ce 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -18,18 +18,26 @@
package org.apache.cassandra.db.compaction.writers;
+import java.io.File;
import java.util.Collection;
+import java.util.List;
import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.compaction.CompactionTask;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.io.sstable.SSTableRewriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.concurrent.Transactional;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.service.StorageService;
/**
@@ -38,6 +46,8 @@ import org.apache.cassandra.utils.concurrent.Transactional;
*/
public abstract class CompactionAwareWriter extends Transactional.AbstractTransactional implements Transactional
{
+ protected static final Logger logger = LoggerFactory.getLogger(DefaultCompactionWriter.class);
+
protected final ColumnFamilyStore cfs;
protected final Directories directories;
protected final Set<SSTableReader> nonExpiredSSTables;
@@ -45,9 +55,11 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
protected final long maxAge;
protected final long minRepairedAt;
- protected final LifecycleTransaction txn;
protected final SSTableRewriter sstableWriter;
- private boolean isInitialized = false;
+ protected final LifecycleTransaction txn;
+ private final Directories.DataDirectory[] locations;
+ private final List<PartitionPosition> diskBoundaries;
+ private int locationIndex;
public CompactionAwareWriter(ColumnFamilyStore cfs,
Directories directories,
@@ -59,12 +71,15 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
this.cfs = cfs;
this.directories = directories;
this.nonExpiredSSTables = nonExpiredSSTables;
- this.estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
- this.maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
- this.minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
this.txn = txn;
- this.sstableWriter = SSTableRewriter.constructKeepingOriginals(txn, keepOriginals, maxAge, offline);
+ estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
+ maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
+ sstableWriter = SSTableRewriter.constructKeepingOriginals(txn, keepOriginals, maxAge, offline);
+ minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
+ locations = cfs.getDirectories().getWriteableLocations();
+ diskBoundaries = StorageService.getDiskBoundaries(cfs);
+ locationIndex = -1;
}
@Override
@@ -96,6 +111,8 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
return sstableWriter.finished();
}
+ public abstract List<SSTableReader> finish(long repairedAt);
+
/**
* estimated number of keys we should write
*/
@@ -104,6 +121,11 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
return estimatedTotalKeys;
}
+ /**
+ * Writes a partition in an implementation specific way
+ * @param partition the partition to append
+ * @return true if the partition was written, false otherwise
+ */
public final boolean append(UnfilteredRowIterator partition)
{
maybeSwitchWriter(partition.partitionKey());
@@ -125,9 +147,26 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
*/
protected void maybeSwitchWriter(DecoratedKey key)
{
- if (!isInitialized)
- switchCompactionLocation(getDirectories().getWriteableLocation(cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType())));
- isInitialized = true;
+ if (diskBoundaries == null)
+ {
+ if (locationIndex < 0)
+ {
+ Directories.DataDirectory defaultLocation = getWriteDirectory(nonExpiredSSTables, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, OperationType.UNKNOWN));
+ switchCompactionLocation(defaultLocation);
+ locationIndex = 0;
+ }
+ return;
+ }
+
+ if (locationIndex > -1 && key.compareTo(diskBoundaries.get(locationIndex)) < 0)
+ return;
+
+ int prevIdx = locationIndex;
+ while (locationIndex == -1 || key.compareTo(diskBoundaries.get(locationIndex)) > 0)
+ locationIndex++;
+ if (prevIdx >= 0)
+ logger.debug("Switching write location from {} to {}", locations[prevIdx], locations[locationIndex]);
+ switchCompactionLocation(locations[locationIndex]);
}
/**
@@ -148,13 +187,37 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
/**
* Return a directory where we can expect expectedWriteSize to fit.
+ *
+ * @param sstables the sstables to compact
+ * @return
*/
- public Directories.DataDirectory getWriteDirectory(long expectedWriteSize)
+ public Directories.DataDirectory getWriteDirectory(Iterable<SSTableReader> sstables, long estimatedWriteSize)
{
- Directories.DataDirectory directory = getDirectories().getWriteableLocation(expectedWriteSize);
- if (directory == null)
- throw new RuntimeException("Insufficient disk space to write " + expectedWriteSize + " bytes");
+ File directory = null;
+ for (SSTableReader sstable : sstables)
+ {
+ if (directory == null)
+ directory = sstable.descriptor.directory;
+ if (!directory.equals(sstable.descriptor.directory))
+ logger.trace("All sstables not from the same disk - putting results in {}", directory);
+ }
+ Directories.DataDirectory d = getDirectories().getDataDirectoryForFile(directory);
+ if (d != null)
+ {
+ if (d.getAvailableSpace() < estimatedWriteSize)
+ throw new RuntimeException(String.format("Not enough space to write %d bytes to %s (%d bytes available)", estimatedWriteSize, d.location, d.getAvailableSpace()));
+ logger.trace("putting compaction results in {}", directory);
+ return d;
+ }
+ d = getDirectories().getWriteableLocation(estimatedWriteSize);
+ if (d == null)
+ throw new RuntimeException("Not enough disk space to store "+estimatedWriteSize+" bytes");
+ return d;
+ }
- return directory;
+ public CompactionAwareWriter setRepairedAt(long repairedAt)
+ {
+ this.sstableWriter.setRepairedAt(repairedAt);
+ return this;
}
}