You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2017/11/24 13:24:12 UTC
[2/3] cassandra git commit: Cache disk boundaries
Cache disk boundaries
Patch by marcuse; reviewed by Paulo Motta for CASSANDRA-13215
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/14e46e46
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/14e46e46
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/14e46e46
Branch: refs/heads/trunk
Commit: 14e46e462cfee15cd06419ee81eb6d9571b6805e
Parents: d2bc37f
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri Oct 27 14:27:36 2017 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Fri Nov 24 14:18:04 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/BlacklistedDirectories.java | 10 ++
.../apache/cassandra/db/ColumnFamilyStore.java | 13 ++
.../org/apache/cassandra/db/DiskBoundaries.java | 71 +++++++++
.../cassandra/db/DiskBoundaryManager.java | 153 +++++++++++++++++++
src/java/org/apache/cassandra/db/Keyspace.java | 2 +
src/java/org/apache/cassandra/db/Memtable.java | 21 +--
.../db/compaction/CompactionManager.java | 8 +-
.../compaction/CompactionStrategyManager.java | 42 ++---
.../cassandra/db/compaction/Scrubber.java | 2 +-
.../writers/CompactionAwareWriter.java | 13 +-
.../sstable/format/RangeAwareSSTableWriter.java | 13 +-
.../cassandra/service/StorageService.java | 58 -------
.../cassandra/db/DiskBoundaryManagerTest.java | 124 +++++++++++++++
14 files changed, 421 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3fc934e..fc18dc3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.11.2
+ * Cache disk boundaries (CASSANDRA-13215)
* Add asm jar to build.xml for maven builds (CASSANDRA-11193)
* Round buffer size to powers of 2 for the chunk cache (CASSANDRA-13897)
* Update jackson JSON jars (CASSANDRA-13949)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
index b8a5914..f090013 100644
--- a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
+++ b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
@@ -25,6 +25,7 @@ import java.lang.management.ManagementFactory;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -42,6 +43,8 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
private final Set<File> unreadableDirectories = new CopyOnWriteArraySet<File>();
private final Set<File> unwritableDirectories = new CopyOnWriteArraySet<File>();
+ private static final AtomicInteger directoriesVersion = new AtomicInteger();
+
private BlacklistedDirectories()
{
// Register this instance with JMX
@@ -89,6 +92,7 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
File directory = getDirectory(path);
if (instance.unreadableDirectories.add(directory))
{
+ directoriesVersion.incrementAndGet();
logger.warn("Blacklisting {} for reads", directory);
return directory;
}
@@ -106,12 +110,18 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
File directory = getDirectory(path);
if (instance.unwritableDirectories.add(directory))
{
+ directoriesVersion.incrementAndGet();
logger.warn("Blacklisting {} for writes", directory);
return directory;
}
return null;
}
+ public static int getDirectoriesVersion()
+ {
+ return directoriesVersion.get();
+ }
+
/**
* Testing only!
* Clear the set of unwritable directories.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index c9514ca..6077b8d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -250,6 +250,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private volatile boolean compactionSpaceCheck = true;
+ @VisibleForTesting
+ final DiskBoundaryManager diskBoundaryManager = new DiskBoundaryManager();
+
public static void shutdownPostFlushExecutor() throws InterruptedException
{
postFlushExecutor.shutdown();
@@ -2649,4 +2652,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
return getIfExists(tableId).metric;
}
+
+ public DiskBoundaries getDiskBoundaries()
+ {
+ return diskBoundaryManager.getDiskBoundaries(this);
+ }
+
+ public void invalidateDiskBoundaries()
+ {
+ diskBoundaryManager.invalidate();
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/DiskBoundaries.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaries.java b/src/java/org/apache/cassandra/db/DiskBoundaries.java
new file mode 100644
index 0000000..ba5a093
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/DiskBoundaries.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+public class DiskBoundaries
+{
+ public final List<Directories.DataDirectory> directories;
+ public final ImmutableList<PartitionPosition> positions;
+ final long ringVersion;
+ final int directoriesVersion;
+
+ DiskBoundaries(Directories.DataDirectory[] directories, List<PartitionPosition> positions, long ringVersion, int diskVersion)
+ {
+ this.directories = directories == null ? null : ImmutableList.copyOf(directories);
+ this.positions = positions == null ? null : ImmutableList.copyOf(positions);
+ this.ringVersion = ringVersion;
+ this.directoriesVersion = diskVersion;
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ DiskBoundaries that = (DiskBoundaries) o;
+
+ if (ringVersion != that.ringVersion) return false;
+ if (directoriesVersion != that.directoriesVersion) return false;
+ if (!directories.equals(that.directories)) return false;
+ return positions != null ? positions.equals(that.positions) : that.positions == null;
+ }
+
+ public int hashCode()
+ {
+ int result = directories != null ? directories.hashCode() : 0;
+ result = 31 * result + (positions != null ? positions.hashCode() : 0);
+ result = 31 * result + (int) (ringVersion ^ (ringVersion >>> 32));
+ result = 31 * result + directoriesVersion;
+ return result;
+ }
+
+ public String toString()
+ {
+ return "DiskBoundaries{" +
+ "directories=" + directories +
+ ", positions=" + positions +
+ ", ringVersion=" + ringVersion +
+ ", directoriesVersion=" + directoriesVersion +
+ '}';
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
new file mode 100644
index 0000000..7872554
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Splitter;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class DiskBoundaryManager
+{
+ private static final Logger logger = LoggerFactory.getLogger(DiskBoundaryManager.class);
+ private volatile DiskBoundaries diskBoundaries;
+
+ public DiskBoundaries getDiskBoundaries(ColumnFamilyStore cfs)
+ {
+ if (!cfs.getPartitioner().splitter().isPresent())
+ return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), null, -1, -1);
+ // copy the reference to avoid getting nulled out by invalidate() below
+ // - it is ok to race, compaction will move any incorrect tokens to their correct places, but
+ // returning null would be bad
+ DiskBoundaries db = diskBoundaries;
+ if (isOutOfDate(diskBoundaries))
+ {
+ synchronized (this)
+ {
+ db = diskBoundaries;
+ if (isOutOfDate(diskBoundaries))
+ {
+ logger.debug("Refreshing disk boundary cache for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
+ DiskBoundaries oldBoundaries = diskBoundaries;
+ db = diskBoundaries = getDiskBoundaryValue(cfs);
+ logger.debug("Updating boundaries from {} to {} for {}.{}", oldBoundaries, diskBoundaries, cfs.keyspace.getName(), cfs.getTableName());
+ }
+ }
+ }
+ return db;
+ }
+
+ /**
+ * check if the given disk boundaries are out of date due not being set or to having too old diskVersion/ringVersion
+ */
+ private boolean isOutOfDate(DiskBoundaries db)
+ {
+ if (db == null)
+ return true;
+ long currentRingVersion = StorageService.instance.getTokenMetadata().getRingVersion();
+ int currentDiskVersion = BlacklistedDirectories.getDirectoriesVersion();
+ return currentRingVersion != db.ringVersion || currentDiskVersion != db.directoriesVersion;
+ }
+
+ private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs)
+ {
+ Collection<Range<Token>> localRanges;
+
+ long ringVersion;
+ TokenMetadata tmd;
+ do
+ {
+ tmd = StorageService.instance.getTokenMetadata();
+ ringVersion = tmd.getRingVersion();
+ if (StorageService.instance.isBootstrapMode())
+ {
+ localRanges = tmd.getPendingRanges(cfs.keyspace.getName(), FBUtilities.getBroadcastAddress());
+ }
+ else
+ {
+ // Reason we use use the future settled TMD is that if we decommission a node, we want to stream
+ // from that node to the correct location on disk, if we didn't, we would put new files in the wrong places.
+ // We do this to minimize the amount of data we need to move in rebalancedisks once everything settled
+ localRanges = cfs.keyspace.getReplicationStrategy().getAddressRanges(tmd.cloneAfterAllSettled()).get(FBUtilities.getBroadcastAddress());
+ }
+ logger.debug("Got local ranges {} (ringVersion = {})", localRanges, ringVersion);
+ }
+ while (ringVersion != tmd.getRingVersion()); // if ringVersion is different here it means that
+ // it might have changed before we calculated localRanges - recalculate
+
+ int directoriesVersion;
+ Directories.DataDirectory[] dirs;
+ do
+ {
+ directoriesVersion = BlacklistedDirectories.getDirectoriesVersion();
+ dirs = cfs.getDirectories().getWriteableLocations();
+ }
+ while (directoriesVersion != BlacklistedDirectories.getDirectoriesVersion()); // if directoriesVersion has changed we need to recalculate
+
+ if (localRanges == null || localRanges.isEmpty())
+ return new DiskBoundaries(dirs, null, ringVersion, directoriesVersion);
+
+ List<Range<Token>> sortedLocalRanges = Range.sort(localRanges);
+
+ List<PartitionPosition> positions = getDiskBoundaries(sortedLocalRanges, cfs.getPartitioner(), dirs);
+ return new DiskBoundaries(dirs, positions, ringVersion, directoriesVersion);
+ }
+
+ /**
+ * Returns a list of disk boundaries, the result will differ depending on whether vnodes are enabled or not.
+ *
+ * What is returned are upper bounds for the disks, meaning everything from partitioner.minToken up to
+ * getDiskBoundaries(..).get(0) should be on the first disk, everything between 0 to 1 should be on the second disk
+ * etc.
+ *
+ * The final entry in the returned list will always be the partitioner maximum tokens upper key bound
+ */
+ private static List<PartitionPosition> getDiskBoundaries(List<Range<Token>> sortedLocalRanges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories)
+ {
+ assert partitioner.splitter().isPresent();
+ Splitter splitter = partitioner.splitter().get();
+ boolean dontSplitRanges = DatabaseDescriptor.getNumTokens() > 1;
+ List<Token> boundaries = splitter.splitOwnedRanges(dataDirectories.length, sortedLocalRanges, dontSplitRanges);
+ // If we can't split by ranges, split evenly to ensure utilisation of all disks
+ if (dontSplitRanges && boundaries.size() < dataDirectories.length)
+ boundaries = splitter.splitOwnedRanges(dataDirectories.length, sortedLocalRanges, false);
+
+ List<PartitionPosition> diskBoundaries = new ArrayList<>();
+ for (int i = 0; i < boundaries.size() - 1; i++)
+ diskBoundaries.add(boundaries.get(i).maxKeyBound());
+ diskBoundaries.add(partitioner.getMaximumToken().maxKeyBound());
+ return diskBoundaries;
+ }
+
+ public void invalidate()
+ {
+ diskBoundaries = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index cffdb80..01fd451 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -344,6 +344,8 @@ public class Keyspace
StorageService.instance.getTokenMetadata(),
DatabaseDescriptor.getEndpointSnitch(),
ksm.params.replication.options);
+ logger.debug("New replication strategy instance - invalidating disk boundary cache");
+ columnFamilyStores.values().forEach(ColumnFamilyStore::invalidateDiskBoundaries);
}
// best invoked on the compaction mananger.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index e0b27fa..cf04016 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -36,7 +36,6 @@ import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.commitlog.IntervalSet;
-import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -54,7 +53,6 @@ import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.memory.HeapPool;
import org.apache.cassandra.utils.memory.MemtableAllocator;
@@ -297,20 +295,17 @@ public class Memtable implements Comparable<Memtable>
public List<FlushRunnable> flushRunnables(LifecycleTransaction txn)
{
- List<Range<Token>> localRanges = Range.sort(StorageService.instance.getLocalRanges(cfs.keyspace.getName()));
-
- if (!cfs.getPartitioner().splitter().isPresent() || localRanges.isEmpty())
- return Collections.singletonList(new FlushRunnable(txn));
-
- return createFlushRunnables(localRanges, txn);
+ return createFlushRunnables(txn);
}
- private List<FlushRunnable> createFlushRunnables(List<Range<Token>> localRanges, LifecycleTransaction txn)
+ private List<FlushRunnable> createFlushRunnables(LifecycleTransaction txn)
{
- assert cfs.getPartitioner().splitter().isPresent();
+ DiskBoundaries diskBoundaries = cfs.getDiskBoundaries();
+ List<PartitionPosition> boundaries = diskBoundaries.positions;
+ List<Directories.DataDirectory> locations = diskBoundaries.directories;
+ if (boundaries == null)
+ return Collections.singletonList(new FlushRunnable(txn));
- Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
- List<PartitionPosition> boundaries = StorageService.getDiskBoundaries(localRanges, cfs.getPartitioner(), locations);
List<FlushRunnable> runnables = new ArrayList<>(boundaries.size());
PartitionPosition rangeStart = cfs.getPartitioner().getMinimumToken().minKeyBound();
try
@@ -318,7 +313,7 @@ public class Memtable implements Comparable<Memtable>
for (int i = 0; i < boundaries.size(); i++)
{
PartitionPosition t = boundaries.get(i);
- runnables.add(new FlushRunnable(rangeStart, t, locations[i], txn));
+ runnables.add(new FlushRunnable(rangeStart, t, locations.get(i), txn));
rangeStart = t;
}
return runnables;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 518e3b5..0a2b461 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -517,9 +517,7 @@ public class CompactionManager implements CompactionManagerMBean
return AllSSTableOpStatus.ABORTED;
}
- final List<Range<Token>> localRanges = Range.sort(r);
- final Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
- final List<PartitionPosition> diskBoundaries = StorageService.getDiskBoundaries(localRanges, cfs.getPartitioner(), locations);
+ final List<PartitionPosition> diskBoundaries = cfs.getDiskBoundaries().positions;
return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
{
@@ -531,7 +529,7 @@ public class CompactionManager implements CompactionManagerMBean
transaction.cancel(Sets.difference(originals, needsRelocation));
Map<Integer, List<SSTableReader>> groupedByDisk = needsRelocation.stream().collect(Collectors.groupingBy((s) ->
- CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), s)));
+ CompactionStrategyManager.getCompactionStrategyIndex(cfs, s)));
int maxSize = 0;
for (List<SSTableReader> diskSSTables : groupedByDisk.values())
@@ -551,7 +549,7 @@ public class CompactionManager implements CompactionManagerMBean
{
if (!cfs.getPartitioner().splitter().isPresent())
return true;
- int directoryIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), sstable);
+ int directoryIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, sstable);
Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
Directories.DataDirectory location = locations[directoryIndex];
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 94def2a..6305096 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -25,6 +25,8 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.db.DiskBoundaries;
import org.apache.cassandra.index.Index;
import com.google.common.primitives.Ints;
@@ -36,7 +38,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.dht.Range;
@@ -49,7 +50,6 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.notifications.*;
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.service.StorageService;
/**
* Manages the compaction strategies.
@@ -207,7 +207,7 @@ public class CompactionStrategyManager implements INotificationConsumer
*/
public AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
{
- int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+ int index = getCompactionStrategyIndex(cfs, sstable);
readLock.lock();
try
{
@@ -231,30 +231,30 @@ public class CompactionStrategyManager implements INotificationConsumer
* sstables in the correct locations and give them to the correct compaction strategy instance.
*
* @param cfs
- * @param locations
* @param sstable
* @return
*/
- public static int getCompactionStrategyIndex(ColumnFamilyStore cfs, Directories locations, SSTableReader sstable)
+ public static int getCompactionStrategyIndex(ColumnFamilyStore cfs, SSTableReader sstable)
{
if (!cfs.getPartitioner().splitter().isPresent())
return 0;
- Directories.DataDirectory[] directories = locations.getWriteableLocations();
- List<PartitionPosition> boundaries = StorageService.getDiskBoundaries(cfs, directories);
- if (boundaries == null)
+ DiskBoundaries boundaries = cfs.getDiskBoundaries();
+ List<Directories.DataDirectory> directories = boundaries.directories;
+
+ if (boundaries.positions == null)
{
// try to figure out location based on sstable directory:
- for (int i = 0; i < directories.length; i++)
+ for (int i = 0; i < directories.size(); i++)
{
- Directories.DataDirectory directory = directories[i];
+ Directories.DataDirectory directory = directories.get(i);
if (sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
return i;
}
return 0;
}
- int pos = Collections.binarySearch(boundaries, sstable.first);
+ int pos = Collections.binarySearch(boundaries.positions, sstable.first);
assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal
return -pos - 1;
}
@@ -449,7 +449,7 @@ public class CompactionStrategyManager implements INotificationConsumer
for (SSTableReader sstable : removed)
{
- int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+ int i = getCompactionStrategyIndex(cfs, sstable);
if (sstable.isRepaired())
repairedRemoved.get(i).add(sstable);
else
@@ -457,7 +457,7 @@ public class CompactionStrategyManager implements INotificationConsumer
}
for (SSTableReader sstable : added)
{
- int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+ int i = getCompactionStrategyIndex(cfs, sstable);
if (sstable.isRepaired())
repairedAdded.get(i).add(sstable);
else
@@ -494,7 +494,7 @@ public class CompactionStrategyManager implements INotificationConsumer
{
for (SSTableReader sstable : sstables)
{
- int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+ int index = getCompactionStrategyIndex(cfs, sstable);
if (sstable.isRepaired())
{
unrepaired.get(index).removeSSTable(sstable);
@@ -608,9 +608,9 @@ public class CompactionStrategyManager implements INotificationConsumer
for (SSTableReader sstable : sstables)
{
if (sstable.isRepaired())
- repairedSSTables.get(getCompactionStrategyIndex(cfs, getDirectories(), sstable)).add(sstable);
+ repairedSSTables.get(getCompactionStrategyIndex(cfs, sstable)).add(sstable);
else
- unrepairedSSTables.get(getCompactionStrategyIndex(cfs, getDirectories(), sstable)).add(sstable);
+ unrepairedSSTables.get(getCompactionStrategyIndex(cfs, sstable)).add(sstable);
}
List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
@@ -647,7 +647,7 @@ public class CompactionStrategyManager implements INotificationConsumer
readLock.lock();
try
{
- Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+ Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
Collection<Collection<SSTableReader>> anticompactionGroups = new ArrayList<>();
for (Map.Entry<Integer, List<SSTableReader>> group : groups.entrySet())
@@ -685,12 +685,12 @@ public class CompactionStrategyManager implements INotificationConsumer
SSTableReader firstSSTable = Iterables.getFirst(input, null);
assert firstSSTable != null;
boolean repaired = firstSSTable.isRepaired();
- int firstIndex = getCompactionStrategyIndex(cfs, directories, firstSSTable);
+ int firstIndex = getCompactionStrategyIndex(cfs, firstSSTable);
for (SSTableReader sstable : input)
{
if (sstable.isRepaired() != repaired)
throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
- if (firstIndex != getCompactionStrategyIndex(cfs, directories, sstable))
+ if (firstIndex != getCompactionStrategyIndex(cfs, sstable))
throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
}
}
@@ -752,11 +752,11 @@ public class CompactionStrategyManager implements INotificationConsumer
{
Map<Integer, List<SSTableReader>> repairedSSTables = sstables.stream()
.filter(s -> !s.isMarkedSuspect() && s.isRepaired())
- .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+ .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
Map<Integer, List<SSTableReader>> unrepairedSSTables = sstables.stream()
.filter(s -> !s.isMarkedSuspect() && !s.isRepaired())
- .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+ .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
for (Map.Entry<Integer, List<SSTableReader>> group : repairedSSTables.entrySet())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 0007e30..b1f2e9f 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -98,7 +98,7 @@ public class Scrubber implements Closeable
List<SSTableReader> toScrub = Collections.singletonList(sstable);
- int locIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), sstable);
+ int locIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, sstable);
this.destination = cfs.getDirectories().getLocationForDisk(cfs.getDirectories().getWriteableLocations()[locIndex]);
this.isCommutative = cfs.metadata.isCounter();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index 205aebe..d2f816b 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.DiskBoundaries;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.compaction.CompactionTask;
@@ -38,7 +39,6 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.Transactional;
import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.service.StorageService;
/**
@@ -58,7 +58,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
protected final SSTableRewriter sstableWriter;
protected final LifecycleTransaction txn;
- private final Directories.DataDirectory[] locations;
+ private final List<Directories.DataDirectory> locations;
private final List<PartitionPosition> diskBoundaries;
private int locationIndex;
@@ -88,8 +88,9 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
sstableWriter = SSTableRewriter.construct(cfs, txn, keepOriginals, maxAge);
minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
- locations = cfs.getDirectories().getWriteableLocations();
- diskBoundaries = StorageService.getDiskBoundaries(cfs);
+ DiskBoundaries db = cfs.getDiskBoundaries();
+ diskBoundaries = db.positions;
+ locations = db.directories;
locationIndex = -1;
}
@@ -174,8 +175,8 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
while (locationIndex == -1 || key.compareTo(diskBoundaries.get(locationIndex)) > 0)
locationIndex++;
if (prevIdx >= 0)
- logger.debug("Switching write location from {} to {}", locations[prevIdx], locations[locationIndex]);
- switchCompactionLocation(locations[locationIndex]);
+ logger.debug("Switching write location from {} to {}", locations.get(prevIdx), locations.get(locationIndex));
+ switchCompactionLocation(locations.get(locationIndex));
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
index 3665da7..353aacb 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
@@ -26,19 +26,19 @@ import java.util.UUID;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.DiskBoundaries;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
public class RangeAwareSSTableWriter implements SSTableMultiWriter
{
private final List<PartitionPosition> boundaries;
- private final Directories.DataDirectory[] directories;
+ private final List<Directories.DataDirectory> directories;
private final int sstableLevel;
private final long estimatedKeys;
private final long repairedAt;
@@ -53,15 +53,16 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter
public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader header) throws IOException
{
- directories = cfs.getDirectories().getWriteableLocations();
+ DiskBoundaries db = cfs.getDiskBoundaries();
+ directories = db.directories;
this.sstableLevel = sstableLevel;
this.cfs = cfs;
- this.estimatedKeys = estimatedKeys / directories.length;
+ this.estimatedKeys = estimatedKeys / directories.size();
this.repairedAt = repairedAt;
this.format = format;
this.txn = txn;
this.header = header;
- boundaries = StorageService.getDiskBoundaries(cfs, directories);
+ boundaries = db.positions;
if (boundaries == null)
{
Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
@@ -90,7 +91,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter
if (currentWriter != null)
finishedWriters.add(currentWriter);
- Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(directories[currentIndex])), format);
+ Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(directories.get(currentIndex))), format);
currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 5b4e552..e93430b 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -5205,62 +5205,4 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
DatabaseDescriptor.setHintedHandoffThrottleInKB(throttleInKB);
logger.info("Updated hinted_handoff_throttle_in_kb to {}", throttleInKB);
}
-
- public static List<PartitionPosition> getDiskBoundaries(ColumnFamilyStore cfs, Directories.DataDirectory[] directories)
- {
- if (!cfs.getPartitioner().splitter().isPresent())
- return null;
-
- Collection<Range<Token>> lr;
-
- if (StorageService.instance.isBootstrapMode())
- {
- lr = StorageService.instance.getTokenMetadata().getPendingRanges(cfs.keyspace.getName(), FBUtilities.getBroadcastAddress());
- }
- else
- {
- // Reason we use use the future settled TMD is that if we decommission a node, we want to stream
- // from that node to the correct location on disk, if we didn't, we would put new files in the wrong places.
- // We do this to minimize the amount of data we need to move in rebalancedisks once everything settled
- TokenMetadata tmd = StorageService.instance.getTokenMetadata().cloneAfterAllSettled();
- lr = cfs.keyspace.getReplicationStrategy().getAddressRanges(tmd).get(FBUtilities.getBroadcastAddress());
- }
-
- if (lr == null || lr.isEmpty())
- return null;
- List<Range<Token>> localRanges = Range.sort(lr);
-
- return getDiskBoundaries(localRanges, cfs.getPartitioner(), directories);
- }
-
- public static List<PartitionPosition> getDiskBoundaries(ColumnFamilyStore cfs)
- {
- return getDiskBoundaries(cfs, cfs.getDirectories().getWriteableLocations());
- }
-
- /**
- * Returns a list of disk boundaries, the result will differ depending on whether vnodes are enabled or not.
- *
- * What is returned are upper bounds for the disks, meaning everything from partitioner.minToken up to
- * getDiskBoundaries(..).get(0) should be on the first disk, everything between 0 to 1 should be on the second disk
- * etc.
- *
- * The final entry in the returned list will always be the partitioner maximum tokens upper key bound
- */
- public static List<PartitionPosition> getDiskBoundaries(List<Range<Token>> localRanges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories)
- {
- assert partitioner.splitter().isPresent();
- Splitter splitter = partitioner.splitter().get();
- boolean dontSplitRanges = DatabaseDescriptor.getNumTokens() > 1;
- List<Token> boundaries = splitter.splitOwnedRanges(dataDirectories.length, localRanges, dontSplitRanges);
- // If we can't split by ranges, split evenly to ensure utilisation of all disks
- if (dontSplitRanges && boundaries.size() < dataDirectories.length)
- boundaries = splitter.splitOwnedRanges(dataDirectories.length, localRanges, false);
-
- List<PartitionPosition> diskBoundaries = new ArrayList<>();
- for (int i = 0; i < boundaries.size() - 1; i++)
- diskBoundaries.add(boundaries.get(i).maxKeyBound());
- diskBoundaries.add(partitioner.getMaximumToken().maxKeyBound());
- return diskBoundaries;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java b/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
new file mode 100644
index 0000000..de79959
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.dht.BootStrapper;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class DiskBoundaryManagerTest extends CQLTester
+{
+ private DiskBoundaryManager dbm;
+ private MockCFS mock;
+ private Directories dirs;
+
+ @Before
+ public void setup()
+ {
+ BlacklistedDirectories.clearUnwritableUnsafe();
+ TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+ metadata.updateNormalTokens(BootStrapper.getRandomTokens(metadata, 10), FBUtilities.getBroadcastAddress());
+ createTable("create table %s (id int primary key, x text)");
+ dbm = getCurrentColumnFamilyStore().diskBoundaryManager;
+ dirs = new Directories(getCurrentColumnFamilyStore().metadata, Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")),
+ new Directories.DataDirectory(new File("/tmp/2")),
+ new Directories.DataDirectory(new File("/tmp/3"))));
+ mock = new MockCFS(getCurrentColumnFamilyStore(), dirs);
+ }
+
+ @Test
+ public void getBoundariesTest()
+ {
+ DiskBoundaries dbv = dbm.getDiskBoundaries(mock);
+ Assert.assertEquals(3, dbv.positions.size());
+ assertEquals(dbv.directories, dirs.getWriteableLocations());
+ }
+
+ @Test
+ public void blackListTest()
+ {
+ DiskBoundaries dbv = dbm.getDiskBoundaries(mock);
+ Assert.assertEquals(3, dbv.positions.size());
+ assertEquals(dbv.directories, dirs.getWriteableLocations());
+ BlacklistedDirectories.maybeMarkUnwritable(new File("/tmp/3"));
+ dbv = dbm.getDiskBoundaries(mock);
+ Assert.assertEquals(2, dbv.positions.size());
+ Assert.assertEquals(Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")),
+ new Directories.DataDirectory(new File("/tmp/2"))),
+ dbv.directories);
+ }
+
+ @Test
+ public void updateTokensTest() throws UnknownHostException
+ {
+ DiskBoundaries dbv1 = dbm.getDiskBoundaries(mock);
+ StorageService.instance.getTokenMetadata().updateNormalTokens(BootStrapper.getRandomTokens(StorageService.instance.getTokenMetadata(), 10), InetAddress.getByName("127.0.0.10"));
+ DiskBoundaries dbv2 = dbm.getDiskBoundaries(mock);
+ assertFalse(dbv1.equals(dbv2));
+ }
+
+ @Test
+ public void alterKeyspaceTest() throws Throwable
+ {
+ DiskBoundaries dbv1 = dbm.getDiskBoundaries(mock);
+ execute("alter keyspace "+keyspace()+" with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 }");
+ DiskBoundaries dbv2 = dbm.getDiskBoundaries(mock);
+ // == on purpose - we just want to make sure that there is a new instance cached
+ assertFalse(dbv1 == dbv2);
+ DiskBoundaries dbv3 = dbm.getDiskBoundaries(mock);
+ assertTrue(dbv2 == dbv3);
+
+ }
+
+ private static void assertEquals(List<Directories.DataDirectory> dir1, Directories.DataDirectory[] dir2)
+ {
+ if (dir1.size() != dir2.length)
+ fail();
+ for (int i = 0; i < dir2.length; i++)
+ {
+ if (!dir1.get(i).equals(dir2[i]))
+ fail();
+ }
+ }
+
+ // just to be able to override the data directories
+ private static class MockCFS extends ColumnFamilyStore
+ {
+ MockCFS(ColumnFamilyStore cfs, Directories dirs)
+ {
+ super(cfs.keyspace, cfs.getTableName(), 0, cfs.metadata, dirs, false, false, true);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org