You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2017/11/24 13:24:12 UTC

[2/3] cassandra git commit: Cache disk boundaries

Cache disk boundaries

Patch by marcuse; reviewed by Paulo Motta for CASSANDRA-13215


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/14e46e46
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/14e46e46
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/14e46e46

Branch: refs/heads/trunk
Commit: 14e46e462cfee15cd06419ee81eb6d9571b6805e
Parents: d2bc37f
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri Oct 27 14:27:36 2017 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Fri Nov 24 14:18:04 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/BlacklistedDirectories.java    |  10 ++
 .../apache/cassandra/db/ColumnFamilyStore.java  |  13 ++
 .../org/apache/cassandra/db/DiskBoundaries.java |  71 +++++++++
 .../cassandra/db/DiskBoundaryManager.java       | 153 +++++++++++++++++++
 src/java/org/apache/cassandra/db/Keyspace.java  |   2 +
 src/java/org/apache/cassandra/db/Memtable.java  |  21 +--
 .../db/compaction/CompactionManager.java        |   8 +-
 .../compaction/CompactionStrategyManager.java   |  42 ++---
 .../cassandra/db/compaction/Scrubber.java       |   2 +-
 .../writers/CompactionAwareWriter.java          |  13 +-
 .../sstable/format/RangeAwareSSTableWriter.java |  13 +-
 .../cassandra/service/StorageService.java       |  58 -------
 .../cassandra/db/DiskBoundaryManagerTest.java   | 124 +++++++++++++++
 14 files changed, 421 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3fc934e..fc18dc3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.11.2
+ * Cache disk boundaries (CASSANDRA-13215)
  * Add asm jar to build.xml for maven builds (CASSANDRA-11193)
  * Round buffer size to powers of 2 for the chunk cache (CASSANDRA-13897)
  * Update jackson JSON jars (CASSANDRA-13949)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
index b8a5914..f090013 100644
--- a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
+++ b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
@@ -25,6 +25,7 @@ import java.lang.management.ManagementFactory;
 import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -42,6 +43,8 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
     private final Set<File> unreadableDirectories = new CopyOnWriteArraySet<File>();
     private final Set<File> unwritableDirectories = new CopyOnWriteArraySet<File>();
 
+    private static final AtomicInteger directoriesVersion = new AtomicInteger();
+
     private BlacklistedDirectories()
     {
         // Register this instance with JMX
@@ -89,6 +92,7 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
         File directory = getDirectory(path);
         if (instance.unreadableDirectories.add(directory))
         {
+            directoriesVersion.incrementAndGet();
             logger.warn("Blacklisting {} for reads", directory);
             return directory;
         }
@@ -106,12 +110,18 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
         File directory = getDirectory(path);
         if (instance.unwritableDirectories.add(directory))
         {
+            directoriesVersion.incrementAndGet();
             logger.warn("Blacklisting {} for writes", directory);
             return directory;
         }
         return null;
     }
 
+    public static int getDirectoriesVersion()
+    {
+        return directoriesVersion.get();
+    }
+
     /**
      * Testing only!
      * Clear the set of unwritable directories.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index c9514ca..6077b8d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -250,6 +250,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     private volatile boolean compactionSpaceCheck = true;
 
+    @VisibleForTesting
+    final DiskBoundaryManager diskBoundaryManager = new DiskBoundaryManager();
+
     public static void shutdownPostFlushExecutor() throws InterruptedException
     {
         postFlushExecutor.shutdown();
@@ -2649,4 +2652,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         return getIfExists(tableId).metric;
     }
+
+    public DiskBoundaries getDiskBoundaries()
+    {
+        return diskBoundaryManager.getDiskBoundaries(this);
+    }
+
+    public void invalidateDiskBoundaries()
+    {
+        diskBoundaryManager.invalidate();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/DiskBoundaries.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaries.java b/src/java/org/apache/cassandra/db/DiskBoundaries.java
new file mode 100644
index 0000000..ba5a093
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/DiskBoundaries.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+public class DiskBoundaries
+{
+    public final List<Directories.DataDirectory> directories;
+    public final ImmutableList<PartitionPosition> positions;
+    final long ringVersion;
+    final int directoriesVersion;
+
+    DiskBoundaries(Directories.DataDirectory[] directories, List<PartitionPosition> positions, long ringVersion, int diskVersion)
+    {
+        this.directories = directories == null ? null : ImmutableList.copyOf(directories);
+        this.positions = positions == null ? null : ImmutableList.copyOf(positions);
+        this.ringVersion = ringVersion;
+        this.directoriesVersion = diskVersion;
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        DiskBoundaries that = (DiskBoundaries) o;
+
+        if (ringVersion != that.ringVersion) return false;
+        if (directoriesVersion != that.directoriesVersion) return false;
+        if (!directories.equals(that.directories)) return false;
+        return positions != null ? positions.equals(that.positions) : that.positions == null;
+    }
+
+    public int hashCode()
+    {
+        int result = directories != null ? directories.hashCode() : 0;
+        result = 31 * result + (positions != null ? positions.hashCode() : 0);
+        result = 31 * result + (int) (ringVersion ^ (ringVersion >>> 32));
+        result = 31 * result + directoriesVersion;
+        return result;
+    }
+
+    public String toString()
+    {
+        return "DiskBoundaries{" +
+               "directories=" + directories +
+               ", positions=" + positions +
+               ", ringVersion=" + ringVersion +
+               ", directoriesVersion=" + directoriesVersion +
+               '}';
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
new file mode 100644
index 0000000..7872554
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Splitter;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class DiskBoundaryManager
+{
+    private static final Logger logger = LoggerFactory.getLogger(DiskBoundaryManager.class);
+    private volatile DiskBoundaries diskBoundaries;
+
+    public DiskBoundaries getDiskBoundaries(ColumnFamilyStore cfs)
+    {
+        if (!cfs.getPartitioner().splitter().isPresent())
+            return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), null, -1, -1);
+        // copy the reference to avoid getting nulled out by invalidate() below
+        // - it is ok to race, compaction will move any incorrect tokens to their correct places, but
+        // returning null would be bad
+        DiskBoundaries db = diskBoundaries;
+        if (isOutOfDate(diskBoundaries))
+        {
+            synchronized (this)
+            {
+                db = diskBoundaries;
+                if (isOutOfDate(diskBoundaries))
+                {
+                    logger.debug("Refreshing disk boundary cache for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
+                    DiskBoundaries oldBoundaries = diskBoundaries;
+                    db = diskBoundaries = getDiskBoundaryValue(cfs);
+                    logger.debug("Updating boundaries from {} to {} for {}.{}", oldBoundaries, diskBoundaries, cfs.keyspace.getName(), cfs.getTableName());
+                }
+            }
+        }
+        return db;
+    }
+
+    /**
+     * check if the given disk boundaries are out of date due not being set or to having too old diskVersion/ringVersion
+     */
+    private boolean isOutOfDate(DiskBoundaries db)
+    {
+        if (db == null)
+            return true;
+        long currentRingVersion = StorageService.instance.getTokenMetadata().getRingVersion();
+        int currentDiskVersion = BlacklistedDirectories.getDirectoriesVersion();
+        return currentRingVersion != db.ringVersion || currentDiskVersion != db.directoriesVersion;
+    }
+
+    private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs)
+    {
+        Collection<Range<Token>> localRanges;
+
+        long ringVersion;
+        TokenMetadata tmd;
+        do
+        {
+            tmd = StorageService.instance.getTokenMetadata();
+            ringVersion = tmd.getRingVersion();
+            if (StorageService.instance.isBootstrapMode())
+            {
+                localRanges = tmd.getPendingRanges(cfs.keyspace.getName(), FBUtilities.getBroadcastAddress());
+            }
+            else
+            {
+                // Reason we use use the future settled TMD is that if we decommission a node, we want to stream
+                // from that node to the correct location on disk, if we didn't, we would put new files in the wrong places.
+                // We do this to minimize the amount of data we need to move in rebalancedisks once everything settled
+                localRanges = cfs.keyspace.getReplicationStrategy().getAddressRanges(tmd.cloneAfterAllSettled()).get(FBUtilities.getBroadcastAddress());
+            }
+            logger.debug("Got local ranges {} (ringVersion = {})", localRanges, ringVersion);
+        }
+        while (ringVersion != tmd.getRingVersion()); // if ringVersion is different here it means that
+                                                     // it might have changed before we calculated localRanges - recalculate
+
+        int directoriesVersion;
+        Directories.DataDirectory[] dirs;
+        do
+        {
+            directoriesVersion = BlacklistedDirectories.getDirectoriesVersion();
+            dirs = cfs.getDirectories().getWriteableLocations();
+        }
+        while (directoriesVersion != BlacklistedDirectories.getDirectoriesVersion()); // if directoriesVersion has changed we need to recalculate
+
+        if (localRanges == null || localRanges.isEmpty())
+            return new DiskBoundaries(dirs, null, ringVersion, directoriesVersion);
+
+        List<Range<Token>> sortedLocalRanges = Range.sort(localRanges);
+
+        List<PartitionPosition> positions = getDiskBoundaries(sortedLocalRanges, cfs.getPartitioner(), dirs);
+        return new DiskBoundaries(dirs, positions, ringVersion, directoriesVersion);
+    }
+
+    /**
+     * Returns a list of disk boundaries, the result will differ depending on whether vnodes are enabled or not.
+     *
+     * What is returned are upper bounds for the disks, meaning everything from partitioner.minToken up to
+     * getDiskBoundaries(..).get(0) should be on the first disk, everything between 0 to 1 should be on the second disk
+     * etc.
+     *
+     * The final entry in the returned list will always be the partitioner maximum tokens upper key bound
+     */
+    private static List<PartitionPosition> getDiskBoundaries(List<Range<Token>> sortedLocalRanges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories)
+    {
+        assert partitioner.splitter().isPresent();
+        Splitter splitter = partitioner.splitter().get();
+        boolean dontSplitRanges = DatabaseDescriptor.getNumTokens() > 1;
+        List<Token> boundaries = splitter.splitOwnedRanges(dataDirectories.length, sortedLocalRanges, dontSplitRanges);
+        // If we can't split by ranges, split evenly to ensure utilisation of all disks
+        if (dontSplitRanges && boundaries.size() < dataDirectories.length)
+            boundaries = splitter.splitOwnedRanges(dataDirectories.length, sortedLocalRanges, false);
+
+        List<PartitionPosition> diskBoundaries = new ArrayList<>();
+        for (int i = 0; i < boundaries.size() - 1; i++)
+            diskBoundaries.add(boundaries.get(i).maxKeyBound());
+        diskBoundaries.add(partitioner.getMaximumToken().maxKeyBound());
+        return diskBoundaries;
+    }
+
+    public void invalidate()
+    {
+        diskBoundaries = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index cffdb80..01fd451 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -344,6 +344,8 @@ public class Keyspace
                                                                                     StorageService.instance.getTokenMetadata(),
                                                                                     DatabaseDescriptor.getEndpointSnitch(),
                                                                                     ksm.params.replication.options);
+        logger.debug("New replication strategy instance - invalidating disk boundary cache");
+        columnFamilyStores.values().forEach(ColumnFamilyStore::invalidateDiskBoundaries);
     }
 
     // best invoked on the compaction mananger.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index e0b27fa..cf04016 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -36,7 +36,6 @@ import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.commitlog.IntervalSet;
-import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -54,7 +53,6 @@ import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.memory.HeapPool;
 import org.apache.cassandra.utils.memory.MemtableAllocator;
@@ -297,20 +295,17 @@ public class Memtable implements Comparable<Memtable>
 
     public List<FlushRunnable> flushRunnables(LifecycleTransaction txn)
     {
-        List<Range<Token>> localRanges = Range.sort(StorageService.instance.getLocalRanges(cfs.keyspace.getName()));
-
-        if (!cfs.getPartitioner().splitter().isPresent() || localRanges.isEmpty())
-            return Collections.singletonList(new FlushRunnable(txn));
-
-        return createFlushRunnables(localRanges, txn);
+        return createFlushRunnables(txn);
     }
 
-    private List<FlushRunnable> createFlushRunnables(List<Range<Token>> localRanges, LifecycleTransaction txn)
+    private List<FlushRunnable> createFlushRunnables(LifecycleTransaction txn)
     {
-        assert cfs.getPartitioner().splitter().isPresent();
+        DiskBoundaries diskBoundaries = cfs.getDiskBoundaries();
+        List<PartitionPosition> boundaries = diskBoundaries.positions;
+        List<Directories.DataDirectory> locations = diskBoundaries.directories;
+        if (boundaries == null)
+            return Collections.singletonList(new FlushRunnable(txn));
 
-        Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
-        List<PartitionPosition> boundaries = StorageService.getDiskBoundaries(localRanges, cfs.getPartitioner(), locations);
         List<FlushRunnable> runnables = new ArrayList<>(boundaries.size());
         PartitionPosition rangeStart = cfs.getPartitioner().getMinimumToken().minKeyBound();
         try
@@ -318,7 +313,7 @@ public class Memtable implements Comparable<Memtable>
             for (int i = 0; i < boundaries.size(); i++)
             {
                 PartitionPosition t = boundaries.get(i);
-                runnables.add(new FlushRunnable(rangeStart, t, locations[i], txn));
+                runnables.add(new FlushRunnable(rangeStart, t, locations.get(i), txn));
                 rangeStart = t;
             }
             return runnables;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 518e3b5..0a2b461 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -517,9 +517,7 @@ public class CompactionManager implements CompactionManagerMBean
             return AllSSTableOpStatus.ABORTED;
         }
 
-        final List<Range<Token>> localRanges = Range.sort(r);
-        final Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
-        final List<PartitionPosition> diskBoundaries = StorageService.getDiskBoundaries(localRanges, cfs.getPartitioner(), locations);
+        final List<PartitionPosition> diskBoundaries = cfs.getDiskBoundaries().positions;
 
         return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
         {
@@ -531,7 +529,7 @@ public class CompactionManager implements CompactionManagerMBean
                 transaction.cancel(Sets.difference(originals, needsRelocation));
 
                 Map<Integer, List<SSTableReader>> groupedByDisk = needsRelocation.stream().collect(Collectors.groupingBy((s) ->
-                        CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), s)));
+                        CompactionStrategyManager.getCompactionStrategyIndex(cfs, s)));
 
                 int maxSize = 0;
                 for (List<SSTableReader> diskSSTables : groupedByDisk.values())
@@ -551,7 +549,7 @@ public class CompactionManager implements CompactionManagerMBean
             {
                 if (!cfs.getPartitioner().splitter().isPresent())
                     return true;
-                int directoryIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), sstable);
+                int directoryIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, sstable);
                 Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
 
                 Directories.DataDirectory location = locations[directoryIndex];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 94def2a..6305096 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -25,6 +25,8 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.db.DiskBoundaries;
 import org.apache.cassandra.index.Index;
 import com.google.common.primitives.Ints;
 
@@ -36,7 +38,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.dht.Range;
@@ -49,7 +50,6 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.notifications.*;
 import org.apache.cassandra.schema.CompactionParams;
 import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.service.StorageService;
 
 /**
  * Manages the compaction strategies.
@@ -207,7 +207,7 @@ public class CompactionStrategyManager implements INotificationConsumer
      */
     public AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
     {
-        int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+        int index = getCompactionStrategyIndex(cfs, sstable);
         readLock.lock();
         try
         {
@@ -231,30 +231,30 @@ public class CompactionStrategyManager implements INotificationConsumer
      * sstables in the correct locations and give them to the correct compaction strategy instance.
      *
      * @param cfs
-     * @param locations
      * @param sstable
      * @return
      */
-    public static int getCompactionStrategyIndex(ColumnFamilyStore cfs, Directories locations, SSTableReader sstable)
+    public static int getCompactionStrategyIndex(ColumnFamilyStore cfs, SSTableReader sstable)
     {
         if (!cfs.getPartitioner().splitter().isPresent())
             return 0;
 
-        Directories.DataDirectory[] directories = locations.getWriteableLocations();
-        List<PartitionPosition> boundaries = StorageService.getDiskBoundaries(cfs, directories);
-        if (boundaries == null)
+        DiskBoundaries boundaries = cfs.getDiskBoundaries();
+        List<Directories.DataDirectory> directories = boundaries.directories;
+
+        if (boundaries.positions == null)
         {
             // try to figure out location based on sstable directory:
-            for (int i = 0; i < directories.length; i++)
+            for (int i = 0; i < directories.size(); i++)
             {
-                Directories.DataDirectory directory = directories[i];
+                Directories.DataDirectory directory = directories.get(i);
                 if (sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
                     return i;
             }
             return 0;
         }
 
-        int pos = Collections.binarySearch(boundaries, sstable.first);
+        int pos = Collections.binarySearch(boundaries.positions, sstable.first);
         assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal
         return -pos - 1;
     }
@@ -449,7 +449,7 @@ public class CompactionStrategyManager implements INotificationConsumer
 
         for (SSTableReader sstable : removed)
         {
-            int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+            int i = getCompactionStrategyIndex(cfs, sstable);
             if (sstable.isRepaired())
                 repairedRemoved.get(i).add(sstable);
             else
@@ -457,7 +457,7 @@ public class CompactionStrategyManager implements INotificationConsumer
         }
         for (SSTableReader sstable : added)
         {
-            int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+            int i = getCompactionStrategyIndex(cfs, sstable);
             if (sstable.isRepaired())
                 repairedAdded.get(i).add(sstable);
             else
@@ -494,7 +494,7 @@ public class CompactionStrategyManager implements INotificationConsumer
         {
             for (SSTableReader sstable : sstables)
             {
-                int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+                int index = getCompactionStrategyIndex(cfs, sstable);
                 if (sstable.isRepaired())
                 {
                     unrepaired.get(index).removeSSTable(sstable);
@@ -608,9 +608,9 @@ public class CompactionStrategyManager implements INotificationConsumer
         for (SSTableReader sstable : sstables)
         {
             if (sstable.isRepaired())
-                repairedSSTables.get(getCompactionStrategyIndex(cfs, getDirectories(), sstable)).add(sstable);
+                repairedSSTables.get(getCompactionStrategyIndex(cfs, sstable)).add(sstable);
             else
-                unrepairedSSTables.get(getCompactionStrategyIndex(cfs, getDirectories(), sstable)).add(sstable);
+                unrepairedSSTables.get(getCompactionStrategyIndex(cfs, sstable)).add(sstable);
         }
 
         List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
@@ -647,7 +647,7 @@ public class CompactionStrategyManager implements INotificationConsumer
         readLock.lock();
         try
         {
-            Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+            Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
             Collection<Collection<SSTableReader>> anticompactionGroups = new ArrayList<>();
 
             for (Map.Entry<Integer, List<SSTableReader>> group : groups.entrySet())
@@ -685,12 +685,12 @@ public class CompactionStrategyManager implements INotificationConsumer
         SSTableReader firstSSTable = Iterables.getFirst(input, null);
         assert firstSSTable != null;
         boolean repaired = firstSSTable.isRepaired();
-        int firstIndex = getCompactionStrategyIndex(cfs, directories, firstSSTable);
+        int firstIndex = getCompactionStrategyIndex(cfs, firstSSTable);
         for (SSTableReader sstable : input)
         {
             if (sstable.isRepaired() != repaired)
                 throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
-            if (firstIndex != getCompactionStrategyIndex(cfs, directories, sstable))
+            if (firstIndex != getCompactionStrategyIndex(cfs, sstable))
                 throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
         }
     }
@@ -752,11 +752,11 @@ public class CompactionStrategyManager implements INotificationConsumer
         {
             Map<Integer, List<SSTableReader>> repairedSSTables = sstables.stream()
                                                                          .filter(s -> !s.isMarkedSuspect() && s.isRepaired())
-                                                                         .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+                                                                         .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
 
             Map<Integer, List<SSTableReader>> unrepairedSSTables = sstables.stream()
                                                                            .filter(s -> !s.isMarkedSuspect() && !s.isRepaired())
-                                                                           .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+                                                                           .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
 
 
             for (Map.Entry<Integer, List<SSTableReader>> group : repairedSSTables.entrySet())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 0007e30..b1f2e9f 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -98,7 +98,7 @@ public class Scrubber implements Closeable
 
         List<SSTableReader> toScrub = Collections.singletonList(sstable);
 
-        int locIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), sstable);
+        int locIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, sstable);
         this.destination = cfs.getDirectories().getLocationForDisk(cfs.getDirectories().getWriteableLocations()[locIndex]);
         this.isCommutative = cfs.metadata.isCounter();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index 205aebe..d2f816b 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.DiskBoundaries;
 import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.compaction.CompactionTask;
@@ -38,7 +39,6 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.Transactional;
 import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.service.StorageService;
 
 
 /**
@@ -58,7 +58,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
 
     protected final SSTableRewriter sstableWriter;
     protected final LifecycleTransaction txn;
-    private final Directories.DataDirectory[] locations;
+    private final List<Directories.DataDirectory> locations;
     private final List<PartitionPosition> diskBoundaries;
     private int locationIndex;
 
@@ -88,8 +88,9 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
         maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
         sstableWriter = SSTableRewriter.construct(cfs, txn, keepOriginals, maxAge);
         minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
-        locations = cfs.getDirectories().getWriteableLocations();
-        diskBoundaries = StorageService.getDiskBoundaries(cfs);
+        DiskBoundaries db = cfs.getDiskBoundaries();
+        diskBoundaries = db.positions;
+        locations = db.directories;
         locationIndex = -1;
     }
 
@@ -174,8 +175,8 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
         while (locationIndex == -1 || key.compareTo(diskBoundaries.get(locationIndex)) > 0)
             locationIndex++;
         if (prevIdx >= 0)
-            logger.debug("Switching write location from {} to {}", locations[prevIdx], locations[locationIndex]);
-        switchCompactionLocation(locations[locationIndex]);
+            logger.debug("Switching write location from {} to {}", locations.get(prevIdx), locations.get(locationIndex));
+        switchCompactionLocation(locations.get(locationIndex));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
index 3665da7..353aacb 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
@@ -26,19 +26,19 @@ import java.util.UUID;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.DiskBoundaries;
 import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
 public class RangeAwareSSTableWriter implements SSTableMultiWriter
 {
     private final List<PartitionPosition> boundaries;
-    private final Directories.DataDirectory[] directories;
+    private final List<Directories.DataDirectory> directories;
     private final int sstableLevel;
     private final long estimatedKeys;
     private final long repairedAt;
@@ -53,15 +53,16 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter
 
     public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader header) throws IOException
     {
-        directories = cfs.getDirectories().getWriteableLocations();
+        DiskBoundaries db = cfs.getDiskBoundaries();
+        directories = db.directories;
         this.sstableLevel = sstableLevel;
         this.cfs = cfs;
-        this.estimatedKeys = estimatedKeys / directories.length;
+        this.estimatedKeys = estimatedKeys / directories.size();
         this.repairedAt = repairedAt;
         this.format = format;
         this.txn = txn;
         this.header = header;
-        boundaries = StorageService.getDiskBoundaries(cfs, directories);
+        boundaries = db.positions;
         if (boundaries == null)
         {
             Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
@@ -90,7 +91,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter
             if (currentWriter != null)
                 finishedWriters.add(currentWriter);
 
-            Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(directories[currentIndex])), format);
+            Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(directories.get(currentIndex))), format);
             currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 5b4e552..e93430b 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -5205,62 +5205,4 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         DatabaseDescriptor.setHintedHandoffThrottleInKB(throttleInKB);
         logger.info("Updated hinted_handoff_throttle_in_kb to {}", throttleInKB);
     }
-
-    public static List<PartitionPosition> getDiskBoundaries(ColumnFamilyStore cfs, Directories.DataDirectory[] directories)
-    {
-        if (!cfs.getPartitioner().splitter().isPresent())
-            return null;
-
-        Collection<Range<Token>> lr;
-
-        if (StorageService.instance.isBootstrapMode())
-        {
-            lr = StorageService.instance.getTokenMetadata().getPendingRanges(cfs.keyspace.getName(), FBUtilities.getBroadcastAddress());
-        }
-        else
-        {
-            // Reason we use use the future settled TMD is that if we decommission a node, we want to stream
-            // from that node to the correct location on disk, if we didn't, we would put new files in the wrong places.
-            // We do this to minimize the amount of data we need to move in rebalancedisks once everything settled
-            TokenMetadata tmd = StorageService.instance.getTokenMetadata().cloneAfterAllSettled();
-            lr = cfs.keyspace.getReplicationStrategy().getAddressRanges(tmd).get(FBUtilities.getBroadcastAddress());
-        }
-
-        if (lr == null || lr.isEmpty())
-            return null;
-        List<Range<Token>> localRanges = Range.sort(lr);
-
-        return getDiskBoundaries(localRanges, cfs.getPartitioner(), directories);
-    }
-
-    public static List<PartitionPosition> getDiskBoundaries(ColumnFamilyStore cfs)
-    {
-        return getDiskBoundaries(cfs, cfs.getDirectories().getWriteableLocations());
-    }
-
-    /**
-     * Returns a list of disk boundaries, the result will differ depending on whether vnodes are enabled or not.
-     *
-     * What is returned are upper bounds for the disks, meaning everything from partitioner.minToken up to
-     * getDiskBoundaries(..).get(0) should be on the first disk, everything between 0 to 1 should be on the second disk
-     * etc.
-     *
-     * The final entry in the returned list will always be the partitioner maximum tokens upper key bound
-     */
-    public static List<PartitionPosition> getDiskBoundaries(List<Range<Token>> localRanges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories)
-    {
-        assert partitioner.splitter().isPresent();
-        Splitter splitter = partitioner.splitter().get();
-        boolean dontSplitRanges = DatabaseDescriptor.getNumTokens() > 1;
-        List<Token> boundaries = splitter.splitOwnedRanges(dataDirectories.length, localRanges, dontSplitRanges);
-        // If we can't split by ranges, split evenly to ensure utilisation of all disks
-        if (dontSplitRanges && boundaries.size() < dataDirectories.length)
-            boundaries = splitter.splitOwnedRanges(dataDirectories.length, localRanges, false);
-
-        List<PartitionPosition> diskBoundaries = new ArrayList<>();
-        for (int i = 0; i < boundaries.size() - 1; i++)
-            diskBoundaries.add(boundaries.get(i).maxKeyBound());
-        diskBoundaries.add(partitioner.getMaximumToken().maxKeyBound());
-        return diskBoundaries;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java b/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
new file mode 100644
index 0000000..de79959
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.dht.BootStrapper;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class DiskBoundaryManagerTest extends CQLTester
+{
+    private DiskBoundaryManager dbm;
+    private MockCFS mock;
+    private Directories dirs;
+
+    @Before
+    public void setup()
+    {
+        BlacklistedDirectories.clearUnwritableUnsafe();
+        TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+        metadata.updateNormalTokens(BootStrapper.getRandomTokens(metadata, 10), FBUtilities.getBroadcastAddress());
+        createTable("create table %s (id int primary key, x text)");
+        dbm = getCurrentColumnFamilyStore().diskBoundaryManager;
+        dirs = new Directories(getCurrentColumnFamilyStore().metadata, Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")),
+                                                                                          new Directories.DataDirectory(new File("/tmp/2")),
+                                                                                          new Directories.DataDirectory(new File("/tmp/3"))));
+        mock = new MockCFS(getCurrentColumnFamilyStore(), dirs);
+    }
+
+    @Test
+    public void getBoundariesTest()
+    {
+        DiskBoundaries dbv = dbm.getDiskBoundaries(mock);
+        Assert.assertEquals(3, dbv.positions.size());
+        assertEquals(dbv.directories, dirs.getWriteableLocations());
+    }
+
+    @Test
+    public void blackListTest()
+    {
+        DiskBoundaries dbv = dbm.getDiskBoundaries(mock);
+        Assert.assertEquals(3, dbv.positions.size());
+        assertEquals(dbv.directories, dirs.getWriteableLocations());
+        BlacklistedDirectories.maybeMarkUnwritable(new File("/tmp/3"));
+        dbv = dbm.getDiskBoundaries(mock);
+        Assert.assertEquals(2, dbv.positions.size());
+        Assert.assertEquals(Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")),
+                                        new Directories.DataDirectory(new File("/tmp/2"))),
+                                 dbv.directories);
+    }
+
+    @Test
+    public void updateTokensTest() throws UnknownHostException
+    {
+        DiskBoundaries dbv1 = dbm.getDiskBoundaries(mock);
+        StorageService.instance.getTokenMetadata().updateNormalTokens(BootStrapper.getRandomTokens(StorageService.instance.getTokenMetadata(), 10), InetAddress.getByName("127.0.0.10"));
+        DiskBoundaries dbv2 = dbm.getDiskBoundaries(mock);
+        assertFalse(dbv1.equals(dbv2));
+    }
+
+    @Test
+    public void alterKeyspaceTest() throws Throwable
+    {
+        DiskBoundaries dbv1 = dbm.getDiskBoundaries(mock);
+        execute("alter keyspace "+keyspace()+" with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 }");
+        DiskBoundaries dbv2 = dbm.getDiskBoundaries(mock);
+        // == on purpose - we just want to make sure that there is a new instance cached
+        assertFalse(dbv1 == dbv2);
+        DiskBoundaries dbv3 = dbm.getDiskBoundaries(mock);
+        assertTrue(dbv2 == dbv3);
+
+    }
+
+    private static void assertEquals(List<Directories.DataDirectory> dir1, Directories.DataDirectory[] dir2)
+    {
+        if (dir1.size() != dir2.length)
+            fail();
+        for (int i = 0; i < dir2.length; i++)
+        {
+            if (!dir1.get(i).equals(dir2[i]))
+                fail();
+        }
+    }
+
+    // just to be able to override the data directories
+    private static class MockCFS extends ColumnFamilyStore
+    {
+        MockCFS(ColumnFamilyStore cfs, Directories dirs)
+        {
+            super(cfs.keyspace, cfs.getTableName(), 0, cfs.metadata, dirs, false, false, true);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org