You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2017/11/24 13:24:11 UTC

[1/3] cassandra git commit: Cache disk boundaries

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.11 d2bc37f43 -> 14e46e462
  refs/heads/trunk 2d2879db7 -> 41904684b


Cache disk boundaries

Patch by marcuse; reviewed by Paulo Motta for CASSANDRA-13215


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/14e46e46
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/14e46e46
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/14e46e46

Branch: refs/heads/cassandra-3.11
Commit: 14e46e462cfee15cd06419ee81eb6d9571b6805e
Parents: d2bc37f
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri Oct 27 14:27:36 2017 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Fri Nov 24 14:18:04 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/BlacklistedDirectories.java    |  10 ++
 .../apache/cassandra/db/ColumnFamilyStore.java  |  13 ++
 .../org/apache/cassandra/db/DiskBoundaries.java |  71 +++++++++
 .../cassandra/db/DiskBoundaryManager.java       | 153 +++++++++++++++++++
 src/java/org/apache/cassandra/db/Keyspace.java  |   2 +
 src/java/org/apache/cassandra/db/Memtable.java  |  21 +--
 .../db/compaction/CompactionManager.java        |   8 +-
 .../compaction/CompactionStrategyManager.java   |  42 ++---
 .../cassandra/db/compaction/Scrubber.java       |   2 +-
 .../writers/CompactionAwareWriter.java          |  13 +-
 .../sstable/format/RangeAwareSSTableWriter.java |  13 +-
 .../cassandra/service/StorageService.java       |  58 -------
 .../cassandra/db/DiskBoundaryManagerTest.java   | 124 +++++++++++++++
 14 files changed, 421 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3fc934e..fc18dc3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.11.2
+ * Cache disk boundaries (CASSANDRA-13215)
  * Add asm jar to build.xml for maven builds (CASSANDRA-11193)
  * Round buffer size to powers of 2 for the chunk cache (CASSANDRA-13897)
  * Update jackson JSON jars (CASSANDRA-13949)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
index b8a5914..f090013 100644
--- a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
+++ b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
@@ -25,6 +25,7 @@ import java.lang.management.ManagementFactory;
 import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -42,6 +43,8 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
     private final Set<File> unreadableDirectories = new CopyOnWriteArraySet<File>();
     private final Set<File> unwritableDirectories = new CopyOnWriteArraySet<File>();
 
+    private static final AtomicInteger directoriesVersion = new AtomicInteger();
+
     private BlacklistedDirectories()
     {
         // Register this instance with JMX
@@ -89,6 +92,7 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
         File directory = getDirectory(path);
         if (instance.unreadableDirectories.add(directory))
         {
+            directoriesVersion.incrementAndGet();
             logger.warn("Blacklisting {} for reads", directory);
             return directory;
         }
@@ -106,12 +110,18 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
         File directory = getDirectory(path);
         if (instance.unwritableDirectories.add(directory))
         {
+            directoriesVersion.incrementAndGet();
             logger.warn("Blacklisting {} for writes", directory);
             return directory;
         }
         return null;
     }
 
+    public static int getDirectoriesVersion()
+    {
+        return directoriesVersion.get();
+    }
+
     /**
      * Testing only!
      * Clear the set of unwritable directories.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index c9514ca..6077b8d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -250,6 +250,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     private volatile boolean compactionSpaceCheck = true;
 
+    @VisibleForTesting
+    final DiskBoundaryManager diskBoundaryManager = new DiskBoundaryManager();
+
     public static void shutdownPostFlushExecutor() throws InterruptedException
     {
         postFlushExecutor.shutdown();
@@ -2649,4 +2652,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         return getIfExists(tableId).metric;
     }
+
+    public DiskBoundaries getDiskBoundaries()
+    {
+        return diskBoundaryManager.getDiskBoundaries(this);
+    }
+
+    public void invalidateDiskBoundaries()
+    {
+        diskBoundaryManager.invalidate();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/DiskBoundaries.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaries.java b/src/java/org/apache/cassandra/db/DiskBoundaries.java
new file mode 100644
index 0000000..ba5a093
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/DiskBoundaries.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+public class DiskBoundaries
+{
+    public final List<Directories.DataDirectory> directories;
+    public final ImmutableList<PartitionPosition> positions;
+    final long ringVersion;
+    final int directoriesVersion;
+
+    DiskBoundaries(Directories.DataDirectory[] directories, List<PartitionPosition> positions, long ringVersion, int diskVersion)
+    {
+        this.directories = directories == null ? null : ImmutableList.copyOf(directories);
+        this.positions = positions == null ? null : ImmutableList.copyOf(positions);
+        this.ringVersion = ringVersion;
+        this.directoriesVersion = diskVersion;
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        DiskBoundaries that = (DiskBoundaries) o;
+
+        if (ringVersion != that.ringVersion) return false;
+        if (directoriesVersion != that.directoriesVersion) return false;
+        if (!directories.equals(that.directories)) return false;
+        return positions != null ? positions.equals(that.positions) : that.positions == null;
+    }
+
+    public int hashCode()
+    {
+        int result = directories != null ? directories.hashCode() : 0;
+        result = 31 * result + (positions != null ? positions.hashCode() : 0);
+        result = 31 * result + (int) (ringVersion ^ (ringVersion >>> 32));
+        result = 31 * result + directoriesVersion;
+        return result;
+    }
+
+    public String toString()
+    {
+        return "DiskBoundaries{" +
+               "directories=" + directories +
+               ", positions=" + positions +
+               ", ringVersion=" + ringVersion +
+               ", directoriesVersion=" + directoriesVersion +
+               '}';
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
new file mode 100644
index 0000000..7872554
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Splitter;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class DiskBoundaryManager
+{
+    private static final Logger logger = LoggerFactory.getLogger(DiskBoundaryManager.class);
+    private volatile DiskBoundaries diskBoundaries;
+
+    public DiskBoundaries getDiskBoundaries(ColumnFamilyStore cfs)
+    {
+        if (!cfs.getPartitioner().splitter().isPresent())
+            return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), null, -1, -1);
+        // copy the reference to avoid getting nulled out by invalidate() below
+        // - it is ok to race, compaction will move any incorrect tokens to their correct places, but
+        // returning null would be bad
+        DiskBoundaries db = diskBoundaries;
+        if (isOutOfDate(diskBoundaries))
+        {
+            synchronized (this)
+            {
+                db = diskBoundaries;
+                if (isOutOfDate(diskBoundaries))
+                {
+                    logger.debug("Refreshing disk boundary cache for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
+                    DiskBoundaries oldBoundaries = diskBoundaries;
+                    db = diskBoundaries = getDiskBoundaryValue(cfs);
+                    logger.debug("Updating boundaries from {} to {} for {}.{}", oldBoundaries, diskBoundaries, cfs.keyspace.getName(), cfs.getTableName());
+                }
+            }
+        }
+        return db;
+    }
+
+    /**
+     * check if the given disk boundaries are out of date due not being set or to having too old diskVersion/ringVersion
+     */
+    private boolean isOutOfDate(DiskBoundaries db)
+    {
+        if (db == null)
+            return true;
+        long currentRingVersion = StorageService.instance.getTokenMetadata().getRingVersion();
+        int currentDiskVersion = BlacklistedDirectories.getDirectoriesVersion();
+        return currentRingVersion != db.ringVersion || currentDiskVersion != db.directoriesVersion;
+    }
+
+    private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs)
+    {
+        Collection<Range<Token>> localRanges;
+
+        long ringVersion;
+        TokenMetadata tmd;
+        do
+        {
+            tmd = StorageService.instance.getTokenMetadata();
+            ringVersion = tmd.getRingVersion();
+            if (StorageService.instance.isBootstrapMode())
+            {
+                localRanges = tmd.getPendingRanges(cfs.keyspace.getName(), FBUtilities.getBroadcastAddress());
+            }
+            else
+            {
+                // Reason we use use the future settled TMD is that if we decommission a node, we want to stream
+                // from that node to the correct location on disk, if we didn't, we would put new files in the wrong places.
+                // We do this to minimize the amount of data we need to move in rebalancedisks once everything settled
+                localRanges = cfs.keyspace.getReplicationStrategy().getAddressRanges(tmd.cloneAfterAllSettled()).get(FBUtilities.getBroadcastAddress());
+            }
+            logger.debug("Got local ranges {} (ringVersion = {})", localRanges, ringVersion);
+        }
+        while (ringVersion != tmd.getRingVersion()); // if ringVersion is different here it means that
+                                                     // it might have changed before we calculated localRanges - recalculate
+
+        int directoriesVersion;
+        Directories.DataDirectory[] dirs;
+        do
+        {
+            directoriesVersion = BlacklistedDirectories.getDirectoriesVersion();
+            dirs = cfs.getDirectories().getWriteableLocations();
+        }
+        while (directoriesVersion != BlacklistedDirectories.getDirectoriesVersion()); // if directoriesVersion has changed we need to recalculate
+
+        if (localRanges == null || localRanges.isEmpty())
+            return new DiskBoundaries(dirs, null, ringVersion, directoriesVersion);
+
+        List<Range<Token>> sortedLocalRanges = Range.sort(localRanges);
+
+        List<PartitionPosition> positions = getDiskBoundaries(sortedLocalRanges, cfs.getPartitioner(), dirs);
+        return new DiskBoundaries(dirs, positions, ringVersion, directoriesVersion);
+    }
+
+    /**
+     * Returns a list of disk boundaries, the result will differ depending on whether vnodes are enabled or not.
+     *
+     * What is returned are upper bounds for the disks, meaning everything from partitioner.minToken up to
+     * getDiskBoundaries(..).get(0) should be on the first disk, everything between 0 to 1 should be on the second disk
+     * etc.
+     *
+     * The final entry in the returned list will always be the partitioner maximum tokens upper key bound
+     */
+    private static List<PartitionPosition> getDiskBoundaries(List<Range<Token>> sortedLocalRanges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories)
+    {
+        assert partitioner.splitter().isPresent();
+        Splitter splitter = partitioner.splitter().get();
+        boolean dontSplitRanges = DatabaseDescriptor.getNumTokens() > 1;
+        List<Token> boundaries = splitter.splitOwnedRanges(dataDirectories.length, sortedLocalRanges, dontSplitRanges);
+        // If we can't split by ranges, split evenly to ensure utilisation of all disks
+        if (dontSplitRanges && boundaries.size() < dataDirectories.length)
+            boundaries = splitter.splitOwnedRanges(dataDirectories.length, sortedLocalRanges, false);
+
+        List<PartitionPosition> diskBoundaries = new ArrayList<>();
+        for (int i = 0; i < boundaries.size() - 1; i++)
+            diskBoundaries.add(boundaries.get(i).maxKeyBound());
+        diskBoundaries.add(partitioner.getMaximumToken().maxKeyBound());
+        return diskBoundaries;
+    }
+
+    public void invalidate()
+    {
+        diskBoundaries = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index cffdb80..01fd451 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -344,6 +344,8 @@ public class Keyspace
                                                                                     StorageService.instance.getTokenMetadata(),
                                                                                     DatabaseDescriptor.getEndpointSnitch(),
                                                                                     ksm.params.replication.options);
+        logger.debug("New replication strategy instance - invalidating disk boundary cache");
+        columnFamilyStores.values().forEach(ColumnFamilyStore::invalidateDiskBoundaries);
     }
 
     // best invoked on the compaction mananger.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index e0b27fa..cf04016 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -36,7 +36,6 @@ import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.commitlog.IntervalSet;
-import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -54,7 +53,6 @@ import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.memory.HeapPool;
 import org.apache.cassandra.utils.memory.MemtableAllocator;
@@ -297,20 +295,17 @@ public class Memtable implements Comparable<Memtable>
 
     public List<FlushRunnable> flushRunnables(LifecycleTransaction txn)
     {
-        List<Range<Token>> localRanges = Range.sort(StorageService.instance.getLocalRanges(cfs.keyspace.getName()));
-
-        if (!cfs.getPartitioner().splitter().isPresent() || localRanges.isEmpty())
-            return Collections.singletonList(new FlushRunnable(txn));
-
-        return createFlushRunnables(localRanges, txn);
+        return createFlushRunnables(txn);
     }
 
-    private List<FlushRunnable> createFlushRunnables(List<Range<Token>> localRanges, LifecycleTransaction txn)
+    private List<FlushRunnable> createFlushRunnables(LifecycleTransaction txn)
     {
-        assert cfs.getPartitioner().splitter().isPresent();
+        DiskBoundaries diskBoundaries = cfs.getDiskBoundaries();
+        List<PartitionPosition> boundaries = diskBoundaries.positions;
+        List<Directories.DataDirectory> locations = diskBoundaries.directories;
+        if (boundaries == null)
+            return Collections.singletonList(new FlushRunnable(txn));
 
-        Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
-        List<PartitionPosition> boundaries = StorageService.getDiskBoundaries(localRanges, cfs.getPartitioner(), locations);
         List<FlushRunnable> runnables = new ArrayList<>(boundaries.size());
         PartitionPosition rangeStart = cfs.getPartitioner().getMinimumToken().minKeyBound();
         try
@@ -318,7 +313,7 @@ public class Memtable implements Comparable<Memtable>
             for (int i = 0; i < boundaries.size(); i++)
             {
                 PartitionPosition t = boundaries.get(i);
-                runnables.add(new FlushRunnable(rangeStart, t, locations[i], txn));
+                runnables.add(new FlushRunnable(rangeStart, t, locations.get(i), txn));
                 rangeStart = t;
             }
             return runnables;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 518e3b5..0a2b461 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -517,9 +517,7 @@ public class CompactionManager implements CompactionManagerMBean
             return AllSSTableOpStatus.ABORTED;
         }
 
-        final List<Range<Token>> localRanges = Range.sort(r);
-        final Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
-        final List<PartitionPosition> diskBoundaries = StorageService.getDiskBoundaries(localRanges, cfs.getPartitioner(), locations);
+        final List<PartitionPosition> diskBoundaries = cfs.getDiskBoundaries().positions;
 
         return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
         {
@@ -531,7 +529,7 @@ public class CompactionManager implements CompactionManagerMBean
                 transaction.cancel(Sets.difference(originals, needsRelocation));
 
                 Map<Integer, List<SSTableReader>> groupedByDisk = needsRelocation.stream().collect(Collectors.groupingBy((s) ->
-                        CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), s)));
+                        CompactionStrategyManager.getCompactionStrategyIndex(cfs, s)));
 
                 int maxSize = 0;
                 for (List<SSTableReader> diskSSTables : groupedByDisk.values())
@@ -551,7 +549,7 @@ public class CompactionManager implements CompactionManagerMBean
             {
                 if (!cfs.getPartitioner().splitter().isPresent())
                     return true;
-                int directoryIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), sstable);
+                int directoryIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, sstable);
                 Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
 
                 Directories.DataDirectory location = locations[directoryIndex];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 94def2a..6305096 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -25,6 +25,8 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.db.DiskBoundaries;
 import org.apache.cassandra.index.Index;
 import com.google.common.primitives.Ints;
 
@@ -36,7 +38,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.dht.Range;
@@ -49,7 +50,6 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.notifications.*;
 import org.apache.cassandra.schema.CompactionParams;
 import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.service.StorageService;
 
 /**
  * Manages the compaction strategies.
@@ -207,7 +207,7 @@ public class CompactionStrategyManager implements INotificationConsumer
      */
     public AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
     {
-        int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+        int index = getCompactionStrategyIndex(cfs, sstable);
         readLock.lock();
         try
         {
@@ -231,30 +231,30 @@ public class CompactionStrategyManager implements INotificationConsumer
      * sstables in the correct locations and give them to the correct compaction strategy instance.
      *
      * @param cfs
-     * @param locations
      * @param sstable
      * @return
      */
-    public static int getCompactionStrategyIndex(ColumnFamilyStore cfs, Directories locations, SSTableReader sstable)
+    public static int getCompactionStrategyIndex(ColumnFamilyStore cfs, SSTableReader sstable)
     {
         if (!cfs.getPartitioner().splitter().isPresent())
             return 0;
 
-        Directories.DataDirectory[] directories = locations.getWriteableLocations();
-        List<PartitionPosition> boundaries = StorageService.getDiskBoundaries(cfs, directories);
-        if (boundaries == null)
+        DiskBoundaries boundaries = cfs.getDiskBoundaries();
+        List<Directories.DataDirectory> directories = boundaries.directories;
+
+        if (boundaries.positions == null)
         {
             // try to figure out location based on sstable directory:
-            for (int i = 0; i < directories.length; i++)
+            for (int i = 0; i < directories.size(); i++)
             {
-                Directories.DataDirectory directory = directories[i];
+                Directories.DataDirectory directory = directories.get(i);
                 if (sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
                     return i;
             }
             return 0;
         }
 
-        int pos = Collections.binarySearch(boundaries, sstable.first);
+        int pos = Collections.binarySearch(boundaries.positions, sstable.first);
         assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal
         return -pos - 1;
     }
@@ -449,7 +449,7 @@ public class CompactionStrategyManager implements INotificationConsumer
 
         for (SSTableReader sstable : removed)
         {
-            int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+            int i = getCompactionStrategyIndex(cfs, sstable);
             if (sstable.isRepaired())
                 repairedRemoved.get(i).add(sstable);
             else
@@ -457,7 +457,7 @@ public class CompactionStrategyManager implements INotificationConsumer
         }
         for (SSTableReader sstable : added)
         {
-            int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+            int i = getCompactionStrategyIndex(cfs, sstable);
             if (sstable.isRepaired())
                 repairedAdded.get(i).add(sstable);
             else
@@ -494,7 +494,7 @@ public class CompactionStrategyManager implements INotificationConsumer
         {
             for (SSTableReader sstable : sstables)
             {
-                int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+                int index = getCompactionStrategyIndex(cfs, sstable);
                 if (sstable.isRepaired())
                 {
                     unrepaired.get(index).removeSSTable(sstable);
@@ -608,9 +608,9 @@ public class CompactionStrategyManager implements INotificationConsumer
         for (SSTableReader sstable : sstables)
         {
             if (sstable.isRepaired())
-                repairedSSTables.get(getCompactionStrategyIndex(cfs, getDirectories(), sstable)).add(sstable);
+                repairedSSTables.get(getCompactionStrategyIndex(cfs, sstable)).add(sstable);
             else
-                unrepairedSSTables.get(getCompactionStrategyIndex(cfs, getDirectories(), sstable)).add(sstable);
+                unrepairedSSTables.get(getCompactionStrategyIndex(cfs, sstable)).add(sstable);
         }
 
         List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
@@ -647,7 +647,7 @@ public class CompactionStrategyManager implements INotificationConsumer
         readLock.lock();
         try
         {
-            Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+            Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
             Collection<Collection<SSTableReader>> anticompactionGroups = new ArrayList<>();
 
             for (Map.Entry<Integer, List<SSTableReader>> group : groups.entrySet())
@@ -685,12 +685,12 @@ public class CompactionStrategyManager implements INotificationConsumer
         SSTableReader firstSSTable = Iterables.getFirst(input, null);
         assert firstSSTable != null;
         boolean repaired = firstSSTable.isRepaired();
-        int firstIndex = getCompactionStrategyIndex(cfs, directories, firstSSTable);
+        int firstIndex = getCompactionStrategyIndex(cfs, firstSSTable);
         for (SSTableReader sstable : input)
         {
             if (sstable.isRepaired() != repaired)
                 throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
-            if (firstIndex != getCompactionStrategyIndex(cfs, directories, sstable))
+            if (firstIndex != getCompactionStrategyIndex(cfs, sstable))
                 throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
         }
     }
@@ -752,11 +752,11 @@ public class CompactionStrategyManager implements INotificationConsumer
         {
             Map<Integer, List<SSTableReader>> repairedSSTables = sstables.stream()
                                                                          .filter(s -> !s.isMarkedSuspect() && s.isRepaired())
-                                                                         .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+                                                                         .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
 
             Map<Integer, List<SSTableReader>> unrepairedSSTables = sstables.stream()
                                                                            .filter(s -> !s.isMarkedSuspect() && !s.isRepaired())
-                                                                           .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+                                                                           .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
 
 
             for (Map.Entry<Integer, List<SSTableReader>> group : repairedSSTables.entrySet())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 0007e30..b1f2e9f 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -98,7 +98,7 @@ public class Scrubber implements Closeable
 
         List<SSTableReader> toScrub = Collections.singletonList(sstable);
 
-        int locIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), sstable);
+        int locIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, sstable);
         this.destination = cfs.getDirectories().getLocationForDisk(cfs.getDirectories().getWriteableLocations()[locIndex]);
         this.isCommutative = cfs.metadata.isCounter();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index 205aebe..d2f816b 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.DiskBoundaries;
 import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.compaction.CompactionTask;
@@ -38,7 +39,6 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.Transactional;
 import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.service.StorageService;
 
 
 /**
@@ -58,7 +58,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
 
     protected final SSTableRewriter sstableWriter;
     protected final LifecycleTransaction txn;
-    private final Directories.DataDirectory[] locations;
+    private final List<Directories.DataDirectory> locations;
     private final List<PartitionPosition> diskBoundaries;
     private int locationIndex;
 
@@ -88,8 +88,9 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
         maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
         sstableWriter = SSTableRewriter.construct(cfs, txn, keepOriginals, maxAge);
         minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
-        locations = cfs.getDirectories().getWriteableLocations();
-        diskBoundaries = StorageService.getDiskBoundaries(cfs);
+        DiskBoundaries db = cfs.getDiskBoundaries();
+        diskBoundaries = db.positions;
+        locations = db.directories;
         locationIndex = -1;
     }
 
@@ -174,8 +175,8 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
         while (locationIndex == -1 || key.compareTo(diskBoundaries.get(locationIndex)) > 0)
             locationIndex++;
         if (prevIdx >= 0)
-            logger.debug("Switching write location from {} to {}", locations[prevIdx], locations[locationIndex]);
-        switchCompactionLocation(locations[locationIndex]);
+            logger.debug("Switching write location from {} to {}", locations.get(prevIdx), locations.get(locationIndex));
+        switchCompactionLocation(locations.get(locationIndex));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
index 3665da7..353aacb 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
@@ -26,19 +26,19 @@ import java.util.UUID;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.DiskBoundaries;
 import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
 public class RangeAwareSSTableWriter implements SSTableMultiWriter
 {
     private final List<PartitionPosition> boundaries;
-    private final Directories.DataDirectory[] directories;
+    private final List<Directories.DataDirectory> directories;
     private final int sstableLevel;
     private final long estimatedKeys;
     private final long repairedAt;
@@ -53,15 +53,16 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter
 
     public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader header) throws IOException
     {
-        directories = cfs.getDirectories().getWriteableLocations();
+        DiskBoundaries db = cfs.getDiskBoundaries();
+        directories = db.directories;
         this.sstableLevel = sstableLevel;
         this.cfs = cfs;
-        this.estimatedKeys = estimatedKeys / directories.length;
+        this.estimatedKeys = estimatedKeys / directories.size();
         this.repairedAt = repairedAt;
         this.format = format;
         this.txn = txn;
         this.header = header;
-        boundaries = StorageService.getDiskBoundaries(cfs, directories);
+        boundaries = db.positions;
         if (boundaries == null)
         {
             Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
@@ -90,7 +91,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter
             if (currentWriter != null)
                 finishedWriters.add(currentWriter);
 
-            Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(directories[currentIndex])), format);
+            Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(directories.get(currentIndex))), format);
             currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 5b4e552..e93430b 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -5205,62 +5205,4 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         DatabaseDescriptor.setHintedHandoffThrottleInKB(throttleInKB);
         logger.info("Updated hinted_handoff_throttle_in_kb to {}", throttleInKB);
     }
-
-    public static List<PartitionPosition> getDiskBoundaries(ColumnFamilyStore cfs, Directories.DataDirectory[] directories)
-    {
-        if (!cfs.getPartitioner().splitter().isPresent())
-            return null;
-
-        Collection<Range<Token>> lr;
-
-        if (StorageService.instance.isBootstrapMode())
-        {
-            lr = StorageService.instance.getTokenMetadata().getPendingRanges(cfs.keyspace.getName(), FBUtilities.getBroadcastAddress());
-        }
-        else
-        {
-            // Reason we use use the future settled TMD is that if we decommission a node, we want to stream
-            // from that node to the correct location on disk, if we didn't, we would put new files in the wrong places.
-            // We do this to minimize the amount of data we need to move in rebalancedisks once everything settled
-            TokenMetadata tmd = StorageService.instance.getTokenMetadata().cloneAfterAllSettled();
-            lr = cfs.keyspace.getReplicationStrategy().getAddressRanges(tmd).get(FBUtilities.getBroadcastAddress());
-        }
-
-        if (lr == null || lr.isEmpty())
-            return null;
-        List<Range<Token>> localRanges = Range.sort(lr);
-
-        return getDiskBoundaries(localRanges, cfs.getPartitioner(), directories);
-    }
-
-    public static List<PartitionPosition> getDiskBoundaries(ColumnFamilyStore cfs)
-    {
-        return getDiskBoundaries(cfs, cfs.getDirectories().getWriteableLocations());
-    }
-
-    /**
-     * Returns a list of disk boundaries, the result will differ depending on whether vnodes are enabled or not.
-     *
-     * What is returned are upper bounds for the disks, meaning everything from partitioner.minToken up to
-     * getDiskBoundaries(..).get(0) should be on the first disk, everything between 0 to 1 should be on the second disk
-     * etc.
-     *
-     * The final entry in the returned list will always be the partitioner maximum tokens upper key bound
-     */
-    public static List<PartitionPosition> getDiskBoundaries(List<Range<Token>> localRanges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories)
-    {
-        assert partitioner.splitter().isPresent();
-        Splitter splitter = partitioner.splitter().get();
-        boolean dontSplitRanges = DatabaseDescriptor.getNumTokens() > 1;
-        List<Token> boundaries = splitter.splitOwnedRanges(dataDirectories.length, localRanges, dontSplitRanges);
-        // If we can't split by ranges, split evenly to ensure utilisation of all disks
-        if (dontSplitRanges && boundaries.size() < dataDirectories.length)
-            boundaries = splitter.splitOwnedRanges(dataDirectories.length, localRanges, false);
-
-        List<PartitionPosition> diskBoundaries = new ArrayList<>();
-        for (int i = 0; i < boundaries.size() - 1; i++)
-            diskBoundaries.add(boundaries.get(i).maxKeyBound());
-        diskBoundaries.add(partitioner.getMaximumToken().maxKeyBound());
-        return diskBoundaries;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java b/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
new file mode 100644
index 0000000..de79959
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.dht.BootStrapper;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class DiskBoundaryManagerTest extends CQLTester
+{
+    private DiskBoundaryManager dbm;
+    private MockCFS mock;
+    private Directories dirs;
+
+    @Before
+    public void setup()
+    {
+        BlacklistedDirectories.clearUnwritableUnsafe();
+        TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+        metadata.updateNormalTokens(BootStrapper.getRandomTokens(metadata, 10), FBUtilities.getBroadcastAddress());
+        createTable("create table %s (id int primary key, x text)");
+        dbm = getCurrentColumnFamilyStore().diskBoundaryManager;
+        dirs = new Directories(getCurrentColumnFamilyStore().metadata, Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")),
+                                                                                          new Directories.DataDirectory(new File("/tmp/2")),
+                                                                                          new Directories.DataDirectory(new File("/tmp/3"))));
+        mock = new MockCFS(getCurrentColumnFamilyStore(), dirs);
+    }
+
+    @Test
+    public void getBoundariesTest()
+    {
+        DiskBoundaries dbv = dbm.getDiskBoundaries(mock);
+        Assert.assertEquals(3, dbv.positions.size());
+        assertEquals(dbv.directories, dirs.getWriteableLocations());
+    }
+
+    @Test
+    public void blackListTest()
+    {
+        DiskBoundaries dbv = dbm.getDiskBoundaries(mock);
+        Assert.assertEquals(3, dbv.positions.size());
+        assertEquals(dbv.directories, dirs.getWriteableLocations());
+        BlacklistedDirectories.maybeMarkUnwritable(new File("/tmp/3"));
+        dbv = dbm.getDiskBoundaries(mock);
+        Assert.assertEquals(2, dbv.positions.size());
+        Assert.assertEquals(Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")),
+                                        new Directories.DataDirectory(new File("/tmp/2"))),
+                                 dbv.directories);
+    }
+
+    @Test
+    public void updateTokensTest() throws UnknownHostException
+    {
+        DiskBoundaries dbv1 = dbm.getDiskBoundaries(mock);
+        StorageService.instance.getTokenMetadata().updateNormalTokens(BootStrapper.getRandomTokens(StorageService.instance.getTokenMetadata(), 10), InetAddress.getByName("127.0.0.10"));
+        DiskBoundaries dbv2 = dbm.getDiskBoundaries(mock);
+        assertFalse(dbv1.equals(dbv2));
+    }
+
+    @Test
+    public void alterKeyspaceTest() throws Throwable
+    {
+        DiskBoundaries dbv1 = dbm.getDiskBoundaries(mock);
+        execute("alter keyspace "+keyspace()+" with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 }");
+        DiskBoundaries dbv2 = dbm.getDiskBoundaries(mock);
+        // == on purpose - we just want to make sure that there is a new instance cached
+        assertFalse(dbv1 == dbv2);
+        DiskBoundaries dbv3 = dbm.getDiskBoundaries(mock);
+        assertTrue(dbv2 == dbv3);
+
+    }
+
+    private static void assertEquals(List<Directories.DataDirectory> dir1, Directories.DataDirectory[] dir2)
+    {
+        if (dir1.size() != dir2.length)
+            fail();
+        for (int i = 0; i < dir2.length; i++)
+        {
+            if (!dir1.get(i).equals(dir2[i]))
+                fail();
+        }
+    }
+
+    // just to be able to override the data directories
+    private static class MockCFS extends ColumnFamilyStore
+    {
+        MockCFS(ColumnFamilyStore cfs, Directories dirs)
+        {
+            super(cfs.keyspace, cfs.getTableName(), 0, cfs.metadata, dirs, false, false, true);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[3/3] cassandra git commit: Merge branch 'cassandra-3.11' into trunk

Posted by ma...@apache.org.
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/41904684
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/41904684
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/41904684

Branch: refs/heads/trunk
Commit: 41904684bb5509595d11f008d0851c7ce625e020
Parents: 2d2879d 14e46e4
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri Nov 24 14:20:57 2017 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Fri Nov 24 14:20:57 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/BlacklistedDirectories.java    |  10 ++
 .../apache/cassandra/db/ColumnFamilyStore.java  |  13 ++
 .../org/apache/cassandra/db/DiskBoundaries.java |  71 +++++++++
 .../cassandra/db/DiskBoundaryManager.java       | 153 +++++++++++++++++++
 src/java/org/apache/cassandra/db/Keyspace.java  |   2 +
 src/java/org/apache/cassandra/db/Memtable.java  |  20 +--
 .../db/compaction/CompactionManager.java        |   8 +-
 .../compaction/CompactionStrategyManager.java   |  47 +++---
 .../cassandra/db/compaction/Scrubber.java       |   2 +-
 .../writers/CompactionAwareWriter.java          |  13 +-
 .../sstable/format/RangeAwareSSTableWriter.java |  13 +-
 .../cassandra/service/StorageService.java       |  58 -------
 .../cassandra/db/DiskBoundaryManagerTest.java   | 124 +++++++++++++++
 14 files changed, 422 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/41904684/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 81b3a6e,fc18dc3..4456af5
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,168 -1,5 +1,169 @@@
 +4.0
 + * Fix flaky SecondaryIndexManagerTest.assert[Not]MarkedAsBuilt (CASSANDRA-13965)
 + * Make LWTs send resultset metadata on every request (CASSANDRA-13992)
 + * Fix flaky indexWithFailedInitializationIsNotQueryableAfterPartialRebuild (CASSANDRA-13963)
 + * Introduce leaf-only iterator (CASSANDRA-9988)
 + * Upgrade Guava to 23.3 and Airline to 0.8 (CASSANDRA-13997)
 + * Allow only one concurrent call to StatusLogger (CASSANDRA-12182)
 + * Refactoring to specialised functional interfaces (CASSANDRA-13982)
 + * Speculative retry should allow more friendly params (CASSANDRA-13876)
 + * Throw exception if we send/receive repair messages to incompatible nodes (CASSANDRA-13944)
 + * Replace usages of MessageDigest with Guava's Hasher (CASSANDRA-13291)
 + * Add nodetool cmd to print hinted handoff window (CASSANDRA-13728)
 + * Fix some alerts raised by static analysis (CASSANDRA-13799)
 + * Checksum sstable metadata (CASSANDRA-13321, CASSANDRA-13593)
 + * Add result set metadata to prepared statement MD5 hash calculation (CASSANDRA-10786)
 + * Refactor GcCompactionTest to avoid boxing (CASSANDRA-13941)
 + * Expose recent histograms in JmxHistograms (CASSANDRA-13642)
 + * Fix buffer length comparison when decompressing in netty-based streaming (CASSANDRA-13899)
 + * Properly close StreamCompressionInputStream to release any ByteBuf (CASSANDRA-13906)
 + * Add SERIAL and LOCAL_SERIAL support for cassandra-stress (CASSANDRA-13925)
 + * LCS needlessly checks for L0 STCS candidates multiple times (CASSANDRA-12961)
 + * Correctly close netty channels when a stream session ends (CASSANDRA-13905)
 + * Update lz4 to 1.4.0 (CASSANDRA-13741)
 + * Optimize Paxos prepare and propose stage for local requests (CASSANDRA-13862)
 + * Throttle base partitions during MV repair streaming to prevent OOM (CASSANDRA-13299)
 + * Use compaction threshold for STCS in L0 (CASSANDRA-13861)
 + * Fix problem with min_compress_ratio: 1 and disallow ratio < 1 (CASSANDRA-13703)
 + * Add extra information to SASI timeout exception (CASSANDRA-13677)
 + * Add incremental repair support for --hosts, --force, and subrange repair (CASSANDRA-13818)
 + * Rework CompactionStrategyManager.getScanners synchronization (CASSANDRA-13786)
 + * Add additional unit tests for batch behavior, TTLs, Timestamps (CASSANDRA-13846)
 + * Add keyspace and table name in schema validation exception (CASSANDRA-13845)
 + * Emit metrics whenever we hit tombstone failures and warn thresholds (CASSANDRA-13771)
 + * Make netty EventLoopGroups daemon threads (CASSANDRA-13837)
 + * Race condition when closing stream sessions (CASSANDRA-13852)
 + * NettyFactoryTest is failing in trunk on macOS (CASSANDRA-13831)
 + * Allow changing log levels via nodetool for related classes (CASSANDRA-12696)
 + * Add stress profile yaml with LWT (CASSANDRA-7960)
 + * Reduce memory copies and object creations when acting on ByteBufs (CASSANDRA-13789)
 + * Simplify mx4j configuration (Cassandra-13578)
 + * Fix trigger example on 4.0 (CASSANDRA-13796)
 + * Force minumum timeout value (CASSANDRA-9375)
 + * Use netty for streaming (CASSANDRA-12229)
 + * Use netty for internode messaging (CASSANDRA-8457)
 + * Add bytes repaired/unrepaired to nodetool tablestats (CASSANDRA-13774)
 + * Don't delete incremental repair sessions if they still have sstables (CASSANDRA-13758)
 + * Fix pending repair manager index out of bounds check (CASSANDRA-13769)
 + * Don't use RangeFetchMapCalculator when RF=1 (CASSANDRA-13576)
 + * Don't optimise trivial ranges in RangeFetchMapCalculator (CASSANDRA-13664)
 + * Use an ExecutorService for repair commands instead of new Thread(..).start() (CASSANDRA-13594)
 + * Fix race / ref leak in anticompaction (CASSANDRA-13688)
 + * Expose tasks queue length via JMX (CASSANDRA-12758)
 + * Fix race / ref leak in PendingRepairManager (CASSANDRA-13751)
 + * Enable ppc64le runtime as unsupported architecture (CASSANDRA-13615)
 + * Improve sstablemetadata output (CASSANDRA-11483)
 + * Support for migrating legacy users to roles has been dropped (CASSANDRA-13371)
 + * Introduce error metrics for repair (CASSANDRA-13387)
 + * Refactoring to primitive functional interfaces in AuthCache (CASSANDRA-13732)
 + * Update metrics to 3.1.5 (CASSANDRA-13648)
 + * batch_size_warn_threshold_in_kb can now be set at runtime (CASSANDRA-13699)
 + * Avoid always rebuilding secondary indexes at startup (CASSANDRA-13725)
 + * Upgrade JMH from 1.13 to 1.19 (CASSANDRA-13727)
 + * Upgrade SLF4J from 1.7.7 to 1.7.25 (CASSANDRA-12996)
 + * Default for start_native_transport now true if not set in config (CASSANDRA-13656)
 + * Don't add localhost to the graph when calculating where to stream from (CASSANDRA-13583)
 + * Make CDC availability more deterministic via hard-linking (CASSANDRA-12148)
 + * Allow skipping equality-restricted clustering columns in ORDER BY clause (CASSANDRA-10271)
 + * Use common nowInSec for validation compactions (CASSANDRA-13671)
 + * Improve handling of IR prepare failures (CASSANDRA-13672)
 + * Send IR coordinator messages synchronously (CASSANDRA-13673)
 + * Flush system.repair table before IR finalize promise (CASSANDRA-13660)
 + * Fix column filter creation for wildcard queries (CASSANDRA-13650)
 + * Add 'nodetool getbatchlogreplaythrottle' and 'nodetool setbatchlogreplaythrottle' (CASSANDRA-13614)
 + * fix race condition in PendingRepairManager (CASSANDRA-13659)
 + * Allow noop incremental repair state transitions (CASSANDRA-13658)
 + * Run repair with down replicas (CASSANDRA-10446)
 + * Added started & completed repair metrics (CASSANDRA-13598)
 + * Added started & completed repair metrics (CASSANDRA-13598)
 + * Improve secondary index (re)build failure and concurrency handling (CASSANDRA-10130)
 + * Improve calculation of available disk space for compaction (CASSANDRA-13068)
 + * Change the accessibility of RowCacheSerializer for third party row cache plugins (CASSANDRA-13579)
 + * Allow sub-range repairs for a preview of repaired data (CASSANDRA-13570)
 + * NPE in IR cleanup when columnfamily has no sstables (CASSANDRA-13585)
 + * Fix Randomness of stress values (CASSANDRA-12744)
 + * Allow selecting Map values and Set elements (CASSANDRA-7396)
 + * Fast and garbage-free Streaming Histogram (CASSANDRA-13444)
 + * Update repairTime for keyspaces on completion (CASSANDRA-13539)
 + * Add configurable upper bound for validation executor threads (CASSANDRA-13521)
 + * Bring back maxHintTTL propery (CASSANDRA-12982)
 + * Add testing guidelines (CASSANDRA-13497)
 + * Add more repair metrics (CASSANDRA-13531)
 + * RangeStreamer should be smarter when picking endpoints for streaming (CASSANDRA-4650)
 + * Avoid rewrapping an exception thrown for cache load functions (CASSANDRA-13367)
 + * Log time elapsed for each incremental repair phase (CASSANDRA-13498)
 + * Add multiple table operation support to cassandra-stress (CASSANDRA-8780)
 + * Fix incorrect cqlsh results when selecting same columns multiple times (CASSANDRA-13262)
 + * Fix WriteResponseHandlerTest is sensitive to test execution order (CASSANDRA-13421)
 + * Improve incremental repair logging (CASSANDRA-13468)
 + * Start compaction when incremental repair finishes (CASSANDRA-13454)
 + * Add repair streaming preview (CASSANDRA-13257)
 + * Cleanup isIncremental/repairedAt usage (CASSANDRA-13430)
 + * Change protocol to allow sending key space independent of query string (CASSANDRA-10145)
 + * Make gc_log and gc_warn settable at runtime (CASSANDRA-12661)
 + * Take number of files in L0 in account when estimating remaining compaction tasks (CASSANDRA-13354)
 + * Skip building views during base table streams on range movements (CASSANDRA-13065)
 + * Improve error messages for +/- operations on maps and tuples (CASSANDRA-13197)
 + * Remove deprecated repair JMX APIs (CASSANDRA-11530)
 + * Fix version check to enable streaming keep-alive (CASSANDRA-12929)
 + * Make it possible to monitor an ideal consistency level separate from actual consistency level (CASSANDRA-13289)
 + * Outbound TCP connections ignore internode authenticator (CASSANDRA-13324)
 + * Upgrade junit from 4.6 to 4.12 (CASSANDRA-13360)
 + * Cleanup ParentRepairSession after repairs (CASSANDRA-13359)
 + * Upgrade snappy-java to 1.1.2.6 (CASSANDRA-13336)
 + * Incremental repair not streaming correct sstables (CASSANDRA-13328)
 + * Upgrade the jna version to 4.3.0 (CASSANDRA-13300)
 + * Add the currentTimestamp, currentDate, currentTime and currentTimeUUID functions (CASSANDRA-13132)
 + * Remove config option index_interval (CASSANDRA-10671)
 + * Reduce lock contention for collection types and serializers (CASSANDRA-13271)
 + * Make it possible to override MessagingService.Verb ids (CASSANDRA-13283)
 + * Avoid synchronized on prepareForRepair in ActiveRepairService (CASSANDRA-9292)
 + * Adds the ability to use uncompressed chunks in compressed files (CASSANDRA-10520)
 + * Don't flush sstables when streaming for incremental repair (CASSANDRA-13226)
 + * Remove unused method (CASSANDRA-13227)
 + * Fix minor bugs related to #9143 (CASSANDRA-13217)
 + * Output warning if user increases RF (CASSANDRA-13079)
 + * Remove pre-3.0 streaming compatibility code for 4.0 (CASSANDRA-13081)
 + * Add support for + and - operations on dates (CASSANDRA-11936)
 + * Fix consistency of incrementally repaired data (CASSANDRA-9143)
 + * Increase commitlog version (CASSANDRA-13161)
 + * Make TableMetadata immutable, optimize Schema (CASSANDRA-9425)
 + * Refactor ColumnCondition (CASSANDRA-12981)
 + * Parallelize streaming of different keyspaces (CASSANDRA-4663)
 + * Improved compactions metrics (CASSANDRA-13015)
 + * Speed-up start-up sequence by avoiding un-needed flushes (CASSANDRA-13031)
 + * Use Caffeine (W-TinyLFU) for on-heap caches (CASSANDRA-10855)
 + * Thrift removal (CASSANDRA-11115)
 + * Remove pre-3.0 compatibility code for 4.0 (CASSANDRA-12716)
 + * Add column definition kind to dropped columns in schema (CASSANDRA-12705)
 + * Add (automate) Nodetool Documentation (CASSANDRA-12672)
 + * Update bundled cqlsh python driver to 3.7.0 (CASSANDRA-12736)
 + * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681)
 + * Clean up the SSTableReader#getScanner API wrt removal of RateLimiter (CASSANDRA-12422)
 + * Use new token allocation for non bootstrap case as well (CASSANDRA-13080)
 + * Avoid byte-array copy when key cache is disabled (CASSANDRA-13084)
 + * Require forceful decommission if number of nodes is less than replication factor (CASSANDRA-12510)
 + * Allow IN restrictions on column families with collections (CASSANDRA-12654)
 + * Log message size in trace message in OutboundTcpConnection (CASSANDRA-13028)
 + * Add timeUnit Days for cassandra-stress (CASSANDRA-13029)
 + * Add mutation size and batch metrics (CASSANDRA-12649)
 + * Add method to get size of endpoints to TokenMetadata (CASSANDRA-12999)
 + * Expose time spent waiting in thread pool queue (CASSANDRA-8398)
 + * Conditionally update index built status to avoid unnecessary flushes (CASSANDRA-12969)
 + * cqlsh auto completion: refactor definition of compaction strategy options (CASSANDRA-12946)
 + * Add support for arithmetic operators (CASSANDRA-11935)
 + * Add histogram for delay to deliver hints (CASSANDRA-13234)
 + * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307)
 + * Changing `max_hint_window_in_ms` at runtime (CASSANDRA-11720)
 + * Trivial format error in StorageProxy (CASSANDRA-13551)
 + * Nodetool repair can hang forever if we lose the notification for the repair completing/failing (CASSANDRA-13480)
 + * Anticompaction can cause noisy log messages (CASSANDRA-13684)
 + * Switch to client init for sstabledump (CASSANDRA-13683)
 + * CQLSH: Don't pause when capturing data (CASSANDRA-13743)
 +
 +
  3.11.2
+  * Cache disk boundaries (CASSANDRA-13215)
   * Add asm jar to build.xml for maven builds (CASSANDRA-11193)
   * Round buffer size to powers of 2 for the chunk cache (CASSANDRA-13897)
   * Update jackson JSON jars (CASSANDRA-13949)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41904684/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41904684/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41904684/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41904684/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41904684/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 10505e6,6305096..1b3ddb5
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@@ -24,13 -23,12 +24,14 @@@ import java.util.concurrent.Callable
  import java.util.concurrent.locks.ReentrantReadWriteLock;
  import java.util.stream.Collectors;
  import java.util.stream.Stream;
 +import java.util.function.Supplier;
  
 +import com.google.common.annotations.VisibleForTesting;
  import com.google.common.collect.Iterables;
  
+ import org.apache.cassandra.db.DiskBoundaries;
 +import org.apache.cassandra.schema.TableMetadata;
  import org.apache.cassandra.index.Index;
 -import com.google.common.primitives.Ints;
  
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -52,8 -50,6 +52,7 @@@ import org.apache.cassandra.io.sstable.
  import org.apache.cassandra.notifications.*;
  import org.apache.cassandra.schema.CompactionParams;
  import org.apache.cassandra.service.ActiveRepairService;
- import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.Pair;
  
  /**
   * Manages the compaction strategies.
@@@ -288,68 -239,26 +286,67 @@@ public class CompactionStrategyManager 
          if (!cfs.getPartitioner().splitter().isPresent())
              return 0;
  
-         Directories.DataDirectory[] directories = locations.getWriteableLocations();
-         List<PartitionPosition> boundaries = StorageService.getDiskBoundaries(cfs, directories);
-         if (boundaries == null)
-             return getCompactionStrategyIndex(locations, sstable.descriptor);
+         DiskBoundaries boundaries = cfs.getDiskBoundaries();
+         List<Directories.DataDirectory> directories = boundaries.directories;
 -
+         if (boundaries.positions == null)
 -        {
 -            // try to figure out location based on sstable directory:
 -            for (int i = 0; i < directories.size(); i++)
 -            {
 -                Directories.DataDirectory directory = directories.get(i);
 -                if (sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
 -                    return i;
 -            }
 -            return 0;
 -        }
++            return getCompactionStrategyIndex(directories, sstable.descriptor);
  
-         int pos = Collections.binarySearch(boundaries, sstable.first);
+         int pos = Collections.binarySearch(boundaries.positions, sstable.first);
          assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal
          return -pos - 1;
      }
  
 +    /**
 +     * get the index for the descriptor based on the existing directories
 +     * @param locations
 +     * @param descriptor
 +     * @return
 +     */
-     private static int getCompactionStrategyIndex(Directories locations, Descriptor descriptor)
++    private static int getCompactionStrategyIndex(List<Directories.DataDirectory> directories, Descriptor descriptor)
 +    {
-          Directories.DataDirectory[] directories = locations.getWriteableLocations();
 +         // try to figure out location based on sstable directory:
-          for (int i = 0; i < directories.length; i++)
++         for (int i = 0; i < directories.size(); i++)
 +         {
-              Directories.DataDirectory directory = directories[i];
++             Directories.DataDirectory directory = directories.get(i);
 +             if (descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
 +                 return i;
 +         }
 +         return 0;
 +    }
 +
 +    @VisibleForTesting
 +    List<AbstractCompactionStrategy> getRepaired()
 +    {
 +        return repaired;
 +    }
 +
 +    @VisibleForTesting
 +    List<AbstractCompactionStrategy> getUnrepaired()
 +    {
 +        return unrepaired;
 +    }
 +
 +    @VisibleForTesting
 +    List<AbstractCompactionStrategy> getForPendingRepair(UUID sessionID)
 +    {
 +        List<AbstractCompactionStrategy> strategies = new ArrayList<>(pendingRepairs.size());
 +        pendingRepairs.forEach(p -> strategies.add(p.get(sessionID)));
 +        return strategies;
 +    }
 +
 +    @VisibleForTesting
 +    Set<UUID> pendingRepairs()
 +    {
 +        Set<UUID> ids = new HashSet<>();
 +        pendingRepairs.forEach(p -> ids.addAll(p.getSessions()));
 +        return ids;
 +    }
 +
 +    public boolean hasDataForPendingRepair(UUID sessionID)
 +    {
 +        return Iterables.any(pendingRepairs, prm -> prm.hasDataForSession(sessionID));
 +    }
 +
      public void shutdown()
      {
          writeLock.lock();
@@@ -553,20 -449,16 +550,20 @@@
  
          for (SSTableReader sstable : removed)
          {
-             int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+             int i = getCompactionStrategyIndex(cfs, sstable);
 -            if (sstable.isRepaired())
 +            if (sstable.isPendingRepair())
 +                pendingRemoved.get(i).add(sstable);
 +            else if (sstable.isRepaired())
                  repairedRemoved.get(i).add(sstable);
              else
                  unrepairedRemoved.get(i).add(sstable);
          }
          for (SSTableReader sstable : added)
          {
-             int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+             int i = getCompactionStrategyIndex(cfs, sstable);
 -            if (sstable.isRepaired())
 +            if (sstable.isPendingRepair())
 +                pendingAdded.get(i).add(sstable);
 +            else if (sstable.isRepaired())
                  repairedAdded.get(i).add(sstable);
              else
                  unrepairedAdded.get(i).add(sstable);
@@@ -613,16 -494,9 +610,16 @@@
          {
              for (SSTableReader sstable : sstables)
              {
-                 int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+                 int index = getCompactionStrategyIndex(cfs, sstable);
 -                if (sstable.isRepaired())
 +                if (sstable.isPendingRepair())
 +                {
 +                    pendingRepairs.get(index).addSSTable(sstable);
 +                    unrepaired.get(index).removeSSTable(sstable);
 +                    repaired.get(index).removeSSTable(sstable);
 +                }
 +                else if (sstable.isRepaired())
                  {
 +                    pendingRepairs.get(index).removeSSTable(sstable);
                      unrepaired.get(index).removeSSTable(sstable);
                      repaired.get(index).addSSTable(sstable);
                  }
@@@ -744,22 -618,6 +741,22 @@@
          readLock.lock();
          try
          {
 +            for (SSTableReader sstable : sstables)
 +            {
-                 int idx = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
++                int idx = getCompactionStrategyIndex(cfs, sstable);
 +                if (sstable.isPendingRepair())
 +                    pendingSSTables.get(idx).add(sstable);
 +                else if (sstable.isRepaired())
 +                    repairedSSTables.get(idx).add(sstable);
 +                else
 +                    unrepairedSSTables.get(idx).add(sstable);
 +            }
 +
 +            for (int i = 0; i < pendingSSTables.size(); i++)
 +            {
 +                if (!pendingSSTables.get(i).isEmpty())
 +                    scanners.addAll(pendingRepairs.get(i).getScanners(pendingSSTables.get(i), ranges));
 +            }
              for (int i = 0; i < repairedSSTables.size(); i++)
              {
                  if (!repairedSSTables.get(i).isEmpty())
@@@ -845,17 -685,13 +842,17 @@@
          SSTableReader firstSSTable = Iterables.getFirst(input, null);
          assert firstSSTable != null;
          boolean repaired = firstSSTable.isRepaired();
-         int firstIndex = getCompactionStrategyIndex(cfs, directories, firstSSTable);
+         int firstIndex = getCompactionStrategyIndex(cfs, firstSSTable);
 +        boolean isPending = firstSSTable.isPendingRepair();
 +        UUID pendingRepair = firstSSTable.getSSTableMetadata().pendingRepair;
          for (SSTableReader sstable : input)
          {
              if (sstable.isRepaired() != repaired)
                  throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
-             if (firstIndex != getCompactionStrategyIndex(cfs, directories, sstable))
+             if (firstIndex != getCompactionStrategyIndex(cfs, sstable))
                  throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
 +            if (isPending && !pendingRepair.equals(sstable.getSSTableMetadata().pendingRepair))
 +                throw new UnsupportedOperationException("You can't compact sstables from different pending repair sessions");
          }
      }
  
@@@ -922,16 -751,13 +919,16 @@@
          try
          {
              Map<Integer, List<SSTableReader>> repairedSSTables = sstables.stream()
 -                                                                         .filter(s -> !s.isMarkedSuspect() && s.isRepaired())
 +                                                                         .filter(s -> !s.isMarkedSuspect() && s.isRepaired() && !s.isPendingRepair())
-                                                                          .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+                                                                          .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
  
              Map<Integer, List<SSTableReader>> unrepairedSSTables = sstables.stream()
 -                                                                           .filter(s -> !s.isMarkedSuspect() && !s.isRepaired())
 +                                                                           .filter(s -> !s.isMarkedSuspect() && !s.isRepaired() && !s.isPendingRepair())
-                                                                            .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+                                                                            .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
  
 +            Map<Integer, List<SSTableReader>> pendingSSTables = sstables.stream()
 +                                                                        .filter(s -> !s.isMarkedSuspect() && s.isPendingRepair())
-                                                                         .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
++                                                                        .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
  
              for (Map.Entry<Integer, List<SSTableReader>> group : repairedSSTables.entrySet())
                  ret.add(repaired.get(group.getKey()).getUserDefinedTask(group.getValue(), gcBefore));
@@@ -1084,16 -899,14 +1081,16 @@@
          readLock.lock();
          try
          {
 -            if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
 -            {
 -                return unrepaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
 -            }
 +            // to avoid creating a compaction strategy for the wrong pending repair manager, we get the index based on where the sstable is to be written
 +            int index = cfs.getPartitioner().splitter().isPresent()
-                         ? getCompactionStrategyIndex(getDirectories(), descriptor)
++                        ? getCompactionStrategyIndex(Arrays.asList(getDirectories().getWriteableLocations()), descriptor)
 +                        : 0;
 +            if (pendingRepair != ActiveRepairService.NO_PENDING_REPAIR)
 +                return pendingRepairs.get(index).getOrCreate(pendingRepair).createSSTableMultiWriter(descriptor, keyCount, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair, collector, header, indexes, txn);
 +            else if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
 +                return unrepaired.get(index).createSSTableMultiWriter(descriptor, keyCount, repairedAt, ActiveRepairService.NO_PENDING_REPAIR, collector, header, indexes, txn);
              else
 -            {
 -                return repaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
 -            }
 +                return repaired.get(index).createSSTableMultiWriter(descriptor, keyCount, repairedAt, ActiveRepairService.NO_PENDING_REPAIR, collector, header, indexes, txn);
          }
          finally
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41904684/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java
index cd5238f,b1f2e9f..7219595
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@@ -99,9 -98,9 +99,9 @@@ public class Scrubber implements Closea
  
          List<SSTableReader> toScrub = Collections.singletonList(sstable);
  
-         int locIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), sstable);
+         int locIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, sstable);
          this.destination = cfs.getDirectories().getLocationForDisk(cfs.getDirectories().getWriteableLocations()[locIndex]);
 -        this.isCommutative = cfs.metadata.isCounter();
 +        this.isCommutative = cfs.metadata().isCounter();
  
          boolean hasIndexFile = (new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX))).exists();
          this.isIndex = cfs.isIndex();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41904684/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index e8f7d72,d2f816b..5ddd99c
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@@ -90,9 -88,9 +90,10 @@@ public abstract class CompactionAwareWr
          maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
          sstableWriter = SSTableRewriter.construct(cfs, txn, keepOriginals, maxAge);
          minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
 +        pendingRepair = CompactionTask.getPendingRepair(nonExpiredSSTables);
-         locations = cfs.getDirectories().getWriteableLocations();
-         diskBoundaries = StorageService.getDiskBoundaries(cfs);
+         DiskBoundaries db = cfs.getDiskBoundaries();
+         diskBoundaries = db.positions;
+         locations = db.directories;
          locationIndex = -1;
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41904684/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
index 88c60e5,353aacb..f289fe3
--- a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
@@@ -32,8 -33,6 +33,7 @@@ import org.apache.cassandra.db.lifecycl
  import org.apache.cassandra.db.rows.UnfilteredRowIterator;
  import org.apache.cassandra.io.sstable.Descriptor;
  import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 +import org.apache.cassandra.schema.TableId;
- import org.apache.cassandra.service.StorageService;
  import org.apache.cassandra.utils.FBUtilities;
  
  public class RangeAwareSSTableWriter implements SSTableMultiWriter
@@@ -53,14 -51,14 +53,15 @@@
      private final List<SSTableReader> finishedReaders = new ArrayList<>();
      private SSTableMultiWriter currentWriter = null;
  
 -    public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader header) throws IOException
 +    public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, UUID pendingRepair, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader header) throws IOException
      {
-         directories = cfs.getDirectories().getWriteableLocations();
+         DiskBoundaries db = cfs.getDiskBoundaries();
+         directories = db.directories;
          this.sstableLevel = sstableLevel;
          this.cfs = cfs;
-         this.estimatedKeys = estimatedKeys / directories.length;
+         this.estimatedKeys = estimatedKeys / directories.size();
          this.repairedAt = repairedAt;
 +        this.pendingRepair = pendingRepair;
          this.format = format;
          this.txn = txn;
          this.header = header;
@@@ -93,8 -91,8 +94,8 @@@
              if (currentWriter != null)
                  finishedWriters.add(currentWriter);
  
-             Descriptor desc = cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(directories[currentIndex]), format);
 -            Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(directories.get(currentIndex))), format);
 -            currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn);
++            Descriptor desc = cfs.newSSTableDescriptor(cfs.getDirectories().getLocationForDisk(directories.get(currentIndex)), format);
 +            currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, pendingRepair, sstableLevel, header, txn);
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41904684/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41904684/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
index 0000000,de79959..fc7c9a4
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
@@@ -1,0 -1,124 +1,124 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.db;
+ 
+ import java.io.File;
+ import java.net.InetAddress;
+ import java.net.UnknownHostException;
+ import java.util.List;
+ 
+ import com.google.common.collect.Lists;
+ import org.junit.Assert;
+ import org.junit.Before;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.cql3.CQLTester;
+ import org.apache.cassandra.dht.BootStrapper;
+ import org.apache.cassandra.locator.TokenMetadata;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.FBUtilities;
+ 
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
+ import static org.junit.Assert.fail;
+ 
+ public class DiskBoundaryManagerTest extends CQLTester
+ {
+     private DiskBoundaryManager dbm;
+     private MockCFS mock;
+     private Directories dirs;
+ 
+     @Before
+     public void setup()
+     {
+         BlacklistedDirectories.clearUnwritableUnsafe();
+         TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+         metadata.updateNormalTokens(BootStrapper.getRandomTokens(metadata, 10), FBUtilities.getBroadcastAddress());
+         createTable("create table %s (id int primary key, x text)");
+         dbm = getCurrentColumnFamilyStore().diskBoundaryManager;
 -        dirs = new Directories(getCurrentColumnFamilyStore().metadata, Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")),
 -                                                                                          new Directories.DataDirectory(new File("/tmp/2")),
 -                                                                                          new Directories.DataDirectory(new File("/tmp/3"))));
++        dirs = new Directories(getCurrentColumnFamilyStore().metadata.get(), Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")),
++                                                                                                new Directories.DataDirectory(new File("/tmp/2")),
++                                                                                                new Directories.DataDirectory(new File("/tmp/3"))));
+         mock = new MockCFS(getCurrentColumnFamilyStore(), dirs);
+     }
+ 
+     @Test
+     public void getBoundariesTest()
+     {
+         DiskBoundaries dbv = dbm.getDiskBoundaries(mock);
+         Assert.assertEquals(3, dbv.positions.size());
+         assertEquals(dbv.directories, dirs.getWriteableLocations());
+     }
+ 
+     @Test
+     public void blackListTest()
+     {
+         DiskBoundaries dbv = dbm.getDiskBoundaries(mock);
+         Assert.assertEquals(3, dbv.positions.size());
+         assertEquals(dbv.directories, dirs.getWriteableLocations());
+         BlacklistedDirectories.maybeMarkUnwritable(new File("/tmp/3"));
+         dbv = dbm.getDiskBoundaries(mock);
+         Assert.assertEquals(2, dbv.positions.size());
+         Assert.assertEquals(Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")),
+                                         new Directories.DataDirectory(new File("/tmp/2"))),
+                                  dbv.directories);
+     }
+ 
+     @Test
+     public void updateTokensTest() throws UnknownHostException
+     {
+         DiskBoundaries dbv1 = dbm.getDiskBoundaries(mock);
+         StorageService.instance.getTokenMetadata().updateNormalTokens(BootStrapper.getRandomTokens(StorageService.instance.getTokenMetadata(), 10), InetAddress.getByName("127.0.0.10"));
+         DiskBoundaries dbv2 = dbm.getDiskBoundaries(mock);
+         assertFalse(dbv1.equals(dbv2));
+     }
+ 
+     @Test
+     public void alterKeyspaceTest() throws Throwable
+     {
+         DiskBoundaries dbv1 = dbm.getDiskBoundaries(mock);
+         execute("alter keyspace "+keyspace()+" with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 }");
+         DiskBoundaries dbv2 = dbm.getDiskBoundaries(mock);
+         // == on purpose - we just want to make sure that there is a new instance cached
+         assertFalse(dbv1 == dbv2);
+         DiskBoundaries dbv3 = dbm.getDiskBoundaries(mock);
+         assertTrue(dbv2 == dbv3);
+ 
+     }
+ 
+     private static void assertEquals(List<Directories.DataDirectory> dir1, Directories.DataDirectory[] dir2)
+     {
+         if (dir1.size() != dir2.length)
+             fail();
+         for (int i = 0; i < dir2.length; i++)
+         {
+             if (!dir1.get(i).equals(dir2[i]))
+                 fail();
+         }
+     }
+ 
+     // just to be able to override the data directories
+     private static class MockCFS extends ColumnFamilyStore
+     {
+         MockCFS(ColumnFamilyStore cfs, Directories dirs)
+         {
+             super(cfs.keyspace, cfs.getTableName(), 0, cfs.metadata, dirs, false, false, true);
+         }
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[2/3] cassandra git commit: Cache disk boundaries

Posted by ma...@apache.org.
Cache disk boundaries

Patch by marcuse; reviewed by Paulo Motta for CASSANDRA-13215


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/14e46e46
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/14e46e46
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/14e46e46

Branch: refs/heads/trunk
Commit: 14e46e462cfee15cd06419ee81eb6d9571b6805e
Parents: d2bc37f
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri Oct 27 14:27:36 2017 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Fri Nov 24 14:18:04 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/BlacklistedDirectories.java    |  10 ++
 .../apache/cassandra/db/ColumnFamilyStore.java  |  13 ++
 .../org/apache/cassandra/db/DiskBoundaries.java |  71 +++++++++
 .../cassandra/db/DiskBoundaryManager.java       | 153 +++++++++++++++++++
 src/java/org/apache/cassandra/db/Keyspace.java  |   2 +
 src/java/org/apache/cassandra/db/Memtable.java  |  21 +--
 .../db/compaction/CompactionManager.java        |   8 +-
 .../compaction/CompactionStrategyManager.java   |  42 ++---
 .../cassandra/db/compaction/Scrubber.java       |   2 +-
 .../writers/CompactionAwareWriter.java          |  13 +-
 .../sstable/format/RangeAwareSSTableWriter.java |  13 +-
 .../cassandra/service/StorageService.java       |  58 -------
 .../cassandra/db/DiskBoundaryManagerTest.java   | 124 +++++++++++++++
 14 files changed, 421 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3fc934e..fc18dc3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.11.2
+ * Cache disk boundaries (CASSANDRA-13215)
  * Add asm jar to build.xml for maven builds (CASSANDRA-11193)
  * Round buffer size to powers of 2 for the chunk cache (CASSANDRA-13897)
  * Update jackson JSON jars (CASSANDRA-13949)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
index b8a5914..f090013 100644
--- a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
+++ b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
@@ -25,6 +25,7 @@ import java.lang.management.ManagementFactory;
 import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -42,6 +43,8 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
     private final Set<File> unreadableDirectories = new CopyOnWriteArraySet<File>();
     private final Set<File> unwritableDirectories = new CopyOnWriteArraySet<File>();
 
+    private static final AtomicInteger directoriesVersion = new AtomicInteger();
+
     private BlacklistedDirectories()
     {
         // Register this instance with JMX
@@ -89,6 +92,7 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
         File directory = getDirectory(path);
         if (instance.unreadableDirectories.add(directory))
         {
+            directoriesVersion.incrementAndGet();
             logger.warn("Blacklisting {} for reads", directory);
             return directory;
         }
@@ -106,12 +110,18 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
         File directory = getDirectory(path);
         if (instance.unwritableDirectories.add(directory))
         {
+            directoriesVersion.incrementAndGet();
             logger.warn("Blacklisting {} for writes", directory);
             return directory;
         }
         return null;
     }
 
+    public static int getDirectoriesVersion()
+    {
+        return directoriesVersion.get();
+    }
+
     /**
      * Testing only!
      * Clear the set of unwritable directories.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index c9514ca..6077b8d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -250,6 +250,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     private volatile boolean compactionSpaceCheck = true;
 
+    @VisibleForTesting
+    final DiskBoundaryManager diskBoundaryManager = new DiskBoundaryManager();
+
     public static void shutdownPostFlushExecutor() throws InterruptedException
     {
         postFlushExecutor.shutdown();
@@ -2649,4 +2652,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         return getIfExists(tableId).metric;
     }
+
+    public DiskBoundaries getDiskBoundaries()
+    {
+        return diskBoundaryManager.getDiskBoundaries(this);
+    }
+
+    public void invalidateDiskBoundaries()
+    {
+        diskBoundaryManager.invalidate();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/DiskBoundaries.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaries.java b/src/java/org/apache/cassandra/db/DiskBoundaries.java
new file mode 100644
index 0000000..ba5a093
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/DiskBoundaries.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+public class DiskBoundaries
+{
+    public final List<Directories.DataDirectory> directories;
+    public final ImmutableList<PartitionPosition> positions;
+    final long ringVersion;
+    final int directoriesVersion;
+
+    DiskBoundaries(Directories.DataDirectory[] directories, List<PartitionPosition> positions, long ringVersion, int diskVersion)
+    {
+        this.directories = directories == null ? null : ImmutableList.copyOf(directories);
+        this.positions = positions == null ? null : ImmutableList.copyOf(positions);
+        this.ringVersion = ringVersion;
+        this.directoriesVersion = diskVersion;
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        DiskBoundaries that = (DiskBoundaries) o;
+
+        if (ringVersion != that.ringVersion) return false;
+        if (directoriesVersion != that.directoriesVersion) return false;
+        if (!directories.equals(that.directories)) return false;
+        return positions != null ? positions.equals(that.positions) : that.positions == null;
+    }
+
+    public int hashCode()
+    {
+        int result = directories != null ? directories.hashCode() : 0;
+        result = 31 * result + (positions != null ? positions.hashCode() : 0);
+        result = 31 * result + (int) (ringVersion ^ (ringVersion >>> 32));
+        result = 31 * result + directoriesVersion;
+        return result;
+    }
+
+    public String toString()
+    {
+        return "DiskBoundaries{" +
+               "directories=" + directories +
+               ", positions=" + positions +
+               ", ringVersion=" + ringVersion +
+               ", directoriesVersion=" + directoriesVersion +
+               '}';
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
new file mode 100644
index 0000000..7872554
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Splitter;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class DiskBoundaryManager
+{
+    private static final Logger logger = LoggerFactory.getLogger(DiskBoundaryManager.class);
+    private volatile DiskBoundaries diskBoundaries;
+
+    public DiskBoundaries getDiskBoundaries(ColumnFamilyStore cfs)
+    {
+        if (!cfs.getPartitioner().splitter().isPresent())
+            return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), null, -1, -1);
+        // copy the reference to avoid getting nulled out by invalidate() below
+        // - it is ok to race, compaction will move any incorrect tokens to their correct places, but
+        // returning null would be bad
+        DiskBoundaries db = diskBoundaries;
+        if (isOutOfDate(diskBoundaries))
+        {
+            synchronized (this)
+            {
+                db = diskBoundaries;
+                if (isOutOfDate(diskBoundaries))
+                {
+                    logger.debug("Refreshing disk boundary cache for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
+                    DiskBoundaries oldBoundaries = diskBoundaries;
+                    db = diskBoundaries = getDiskBoundaryValue(cfs);
+                    logger.debug("Updating boundaries from {} to {} for {}.{}", oldBoundaries, diskBoundaries, cfs.keyspace.getName(), cfs.getTableName());
+                }
+            }
+        }
+        return db;
+    }
+
+    /**
+     * check if the given disk boundaries are out of date due not being set or to having too old diskVersion/ringVersion
+     */
+    private boolean isOutOfDate(DiskBoundaries db)
+    {
+        if (db == null)
+            return true;
+        long currentRingVersion = StorageService.instance.getTokenMetadata().getRingVersion();
+        int currentDiskVersion = BlacklistedDirectories.getDirectoriesVersion();
+        return currentRingVersion != db.ringVersion || currentDiskVersion != db.directoriesVersion;
+    }
+
+    private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs)
+    {
+        Collection<Range<Token>> localRanges;
+
+        long ringVersion;
+        TokenMetadata tmd;
+        do
+        {
+            tmd = StorageService.instance.getTokenMetadata();
+            ringVersion = tmd.getRingVersion();
+            if (StorageService.instance.isBootstrapMode())
+            {
+                localRanges = tmd.getPendingRanges(cfs.keyspace.getName(), FBUtilities.getBroadcastAddress());
+            }
+            else
+            {
+                // Reason we use use the future settled TMD is that if we decommission a node, we want to stream
+                // from that node to the correct location on disk, if we didn't, we would put new files in the wrong places.
+                // We do this to minimize the amount of data we need to move in rebalancedisks once everything settled
+                localRanges = cfs.keyspace.getReplicationStrategy().getAddressRanges(tmd.cloneAfterAllSettled()).get(FBUtilities.getBroadcastAddress());
+            }
+            logger.debug("Got local ranges {} (ringVersion = {})", localRanges, ringVersion);
+        }
+        while (ringVersion != tmd.getRingVersion()); // if ringVersion is different here it means that
+                                                     // it might have changed before we calculated localRanges - recalculate
+
+        int directoriesVersion;
+        Directories.DataDirectory[] dirs;
+        do
+        {
+            directoriesVersion = BlacklistedDirectories.getDirectoriesVersion();
+            dirs = cfs.getDirectories().getWriteableLocations();
+        }
+        while (directoriesVersion != BlacklistedDirectories.getDirectoriesVersion()); // if directoriesVersion has changed we need to recalculate
+
+        if (localRanges == null || localRanges.isEmpty())
+            return new DiskBoundaries(dirs, null, ringVersion, directoriesVersion);
+
+        List<Range<Token>> sortedLocalRanges = Range.sort(localRanges);
+
+        List<PartitionPosition> positions = getDiskBoundaries(sortedLocalRanges, cfs.getPartitioner(), dirs);
+        return new DiskBoundaries(dirs, positions, ringVersion, directoriesVersion);
+    }
+
+    /**
+     * Returns a list of disk boundaries, the result will differ depending on whether vnodes are enabled or not.
+     *
+     * What is returned are upper bounds for the disks, meaning everything from partitioner.minToken up to
+     * getDiskBoundaries(..).get(0) should be on the first disk, everything between 0 to 1 should be on the second disk
+     * etc.
+     *
+     * The final entry in the returned list will always be the partitioner maximum tokens upper key bound
+     */
+    private static List<PartitionPosition> getDiskBoundaries(List<Range<Token>> sortedLocalRanges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories)
+    {
+        assert partitioner.splitter().isPresent();
+        Splitter splitter = partitioner.splitter().get();
+        boolean dontSplitRanges = DatabaseDescriptor.getNumTokens() > 1;
+        List<Token> boundaries = splitter.splitOwnedRanges(dataDirectories.length, sortedLocalRanges, dontSplitRanges);
+        // If we can't split by ranges, split evenly to ensure utilisation of all disks
+        if (dontSplitRanges && boundaries.size() < dataDirectories.length)
+            boundaries = splitter.splitOwnedRanges(dataDirectories.length, sortedLocalRanges, false);
+
+        List<PartitionPosition> diskBoundaries = new ArrayList<>();
+        for (int i = 0; i < boundaries.size() - 1; i++)
+            diskBoundaries.add(boundaries.get(i).maxKeyBound());
+        diskBoundaries.add(partitioner.getMaximumToken().maxKeyBound());
+        return diskBoundaries;
+    }
+
+    public void invalidate()
+    {
+        diskBoundaries = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index cffdb80..01fd451 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -344,6 +344,8 @@ public class Keyspace
                                                                                     StorageService.instance.getTokenMetadata(),
                                                                                     DatabaseDescriptor.getEndpointSnitch(),
                                                                                     ksm.params.replication.options);
+        logger.debug("New replication strategy instance - invalidating disk boundary cache");
+        columnFamilyStores.values().forEach(ColumnFamilyStore::invalidateDiskBoundaries);
     }
 
     // best invoked on the compaction mananger.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index e0b27fa..cf04016 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -36,7 +36,6 @@ import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.commitlog.IntervalSet;
-import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -54,7 +53,6 @@ import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.memory.HeapPool;
 import org.apache.cassandra.utils.memory.MemtableAllocator;
@@ -297,20 +295,17 @@ public class Memtable implements Comparable<Memtable>
 
     public List<FlushRunnable> flushRunnables(LifecycleTransaction txn)
     {
-        List<Range<Token>> localRanges = Range.sort(StorageService.instance.getLocalRanges(cfs.keyspace.getName()));
-
-        if (!cfs.getPartitioner().splitter().isPresent() || localRanges.isEmpty())
-            return Collections.singletonList(new FlushRunnable(txn));
-
-        return createFlushRunnables(localRanges, txn);
+        return createFlushRunnables(txn);
     }
 
-    private List<FlushRunnable> createFlushRunnables(List<Range<Token>> localRanges, LifecycleTransaction txn)
+    private List<FlushRunnable> createFlushRunnables(LifecycleTransaction txn)
     {
-        assert cfs.getPartitioner().splitter().isPresent();
+        DiskBoundaries diskBoundaries = cfs.getDiskBoundaries();
+        List<PartitionPosition> boundaries = diskBoundaries.positions;
+        List<Directories.DataDirectory> locations = diskBoundaries.directories;
+        if (boundaries == null)
+            return Collections.singletonList(new FlushRunnable(txn));
 
-        Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
-        List<PartitionPosition> boundaries = StorageService.getDiskBoundaries(localRanges, cfs.getPartitioner(), locations);
         List<FlushRunnable> runnables = new ArrayList<>(boundaries.size());
         PartitionPosition rangeStart = cfs.getPartitioner().getMinimumToken().minKeyBound();
         try
@@ -318,7 +313,7 @@ public class Memtable implements Comparable<Memtable>
             for (int i = 0; i < boundaries.size(); i++)
             {
                 PartitionPosition t = boundaries.get(i);
-                runnables.add(new FlushRunnable(rangeStart, t, locations[i], txn));
+                runnables.add(new FlushRunnable(rangeStart, t, locations.get(i), txn));
                 rangeStart = t;
             }
             return runnables;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 518e3b5..0a2b461 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -517,9 +517,7 @@ public class CompactionManager implements CompactionManagerMBean
             return AllSSTableOpStatus.ABORTED;
         }
 
-        final List<Range<Token>> localRanges = Range.sort(r);
-        final Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
-        final List<PartitionPosition> diskBoundaries = StorageService.getDiskBoundaries(localRanges, cfs.getPartitioner(), locations);
+        final List<PartitionPosition> diskBoundaries = cfs.getDiskBoundaries().positions;
 
         return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
         {
@@ -531,7 +529,7 @@ public class CompactionManager implements CompactionManagerMBean
                 transaction.cancel(Sets.difference(originals, needsRelocation));
 
                 Map<Integer, List<SSTableReader>> groupedByDisk = needsRelocation.stream().collect(Collectors.groupingBy((s) ->
-                        CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), s)));
+                        CompactionStrategyManager.getCompactionStrategyIndex(cfs, s)));
 
                 int maxSize = 0;
                 for (List<SSTableReader> diskSSTables : groupedByDisk.values())
@@ -551,7 +549,7 @@ public class CompactionManager implements CompactionManagerMBean
             {
                 if (!cfs.getPartitioner().splitter().isPresent())
                     return true;
-                int directoryIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), sstable);
+                int directoryIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, sstable);
                 Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
 
                 Directories.DataDirectory location = locations[directoryIndex];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 94def2a..6305096 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -25,6 +25,8 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.db.DiskBoundaries;
 import org.apache.cassandra.index.Index;
 import com.google.common.primitives.Ints;
 
@@ -36,7 +38,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.SerializationHeader;
-import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.dht.Range;
@@ -49,7 +50,6 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.notifications.*;
 import org.apache.cassandra.schema.CompactionParams;
 import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.service.StorageService;
 
 /**
  * Manages the compaction strategies.
@@ -207,7 +207,7 @@ public class CompactionStrategyManager implements INotificationConsumer
      */
     public AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
     {
-        int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+        int index = getCompactionStrategyIndex(cfs, sstable);
         readLock.lock();
         try
         {
@@ -231,30 +231,30 @@ public class CompactionStrategyManager implements INotificationConsumer
      * sstables in the correct locations and give them to the correct compaction strategy instance.
      *
      * @param cfs
-     * @param locations
      * @param sstable
      * @return
      */
-    public static int getCompactionStrategyIndex(ColumnFamilyStore cfs, Directories locations, SSTableReader sstable)
+    public static int getCompactionStrategyIndex(ColumnFamilyStore cfs, SSTableReader sstable)
     {
         if (!cfs.getPartitioner().splitter().isPresent())
             return 0;
 
-        Directories.DataDirectory[] directories = locations.getWriteableLocations();
-        List<PartitionPosition> boundaries = StorageService.getDiskBoundaries(cfs, directories);
-        if (boundaries == null)
+        DiskBoundaries boundaries = cfs.getDiskBoundaries();
+        List<Directories.DataDirectory> directories = boundaries.directories;
+
+        if (boundaries.positions == null)
         {
             // try to figure out location based on sstable directory:
-            for (int i = 0; i < directories.length; i++)
+            for (int i = 0; i < directories.size(); i++)
             {
-                Directories.DataDirectory directory = directories[i];
+                Directories.DataDirectory directory = directories.get(i);
                 if (sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
                     return i;
             }
             return 0;
         }
 
-        int pos = Collections.binarySearch(boundaries, sstable.first);
+        int pos = Collections.binarySearch(boundaries.positions, sstable.first);
         assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal
         return -pos - 1;
     }
@@ -449,7 +449,7 @@ public class CompactionStrategyManager implements INotificationConsumer
 
         for (SSTableReader sstable : removed)
         {
-            int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+            int i = getCompactionStrategyIndex(cfs, sstable);
             if (sstable.isRepaired())
                 repairedRemoved.get(i).add(sstable);
             else
@@ -457,7 +457,7 @@ public class CompactionStrategyManager implements INotificationConsumer
         }
         for (SSTableReader sstable : added)
         {
-            int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+            int i = getCompactionStrategyIndex(cfs, sstable);
             if (sstable.isRepaired())
                 repairedAdded.get(i).add(sstable);
             else
@@ -494,7 +494,7 @@ public class CompactionStrategyManager implements INotificationConsumer
         {
             for (SSTableReader sstable : sstables)
             {
-                int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+                int index = getCompactionStrategyIndex(cfs, sstable);
                 if (sstable.isRepaired())
                 {
                     unrepaired.get(index).removeSSTable(sstable);
@@ -608,9 +608,9 @@ public class CompactionStrategyManager implements INotificationConsumer
         for (SSTableReader sstable : sstables)
         {
             if (sstable.isRepaired())
-                repairedSSTables.get(getCompactionStrategyIndex(cfs, getDirectories(), sstable)).add(sstable);
+                repairedSSTables.get(getCompactionStrategyIndex(cfs, sstable)).add(sstable);
             else
-                unrepairedSSTables.get(getCompactionStrategyIndex(cfs, getDirectories(), sstable)).add(sstable);
+                unrepairedSSTables.get(getCompactionStrategyIndex(cfs, sstable)).add(sstable);
         }
 
         List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
@@ -647,7 +647,7 @@ public class CompactionStrategyManager implements INotificationConsumer
         readLock.lock();
         try
         {
-            Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+            Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
             Collection<Collection<SSTableReader>> anticompactionGroups = new ArrayList<>();
 
             for (Map.Entry<Integer, List<SSTableReader>> group : groups.entrySet())
@@ -685,12 +685,12 @@ public class CompactionStrategyManager implements INotificationConsumer
         SSTableReader firstSSTable = Iterables.getFirst(input, null);
         assert firstSSTable != null;
         boolean repaired = firstSSTable.isRepaired();
-        int firstIndex = getCompactionStrategyIndex(cfs, directories, firstSSTable);
+        int firstIndex = getCompactionStrategyIndex(cfs, firstSSTable);
         for (SSTableReader sstable : input)
         {
             if (sstable.isRepaired() != repaired)
                 throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
-            if (firstIndex != getCompactionStrategyIndex(cfs, directories, sstable))
+            if (firstIndex != getCompactionStrategyIndex(cfs, sstable))
                 throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
         }
     }
@@ -752,11 +752,11 @@ public class CompactionStrategyManager implements INotificationConsumer
         {
             Map<Integer, List<SSTableReader>> repairedSSTables = sstables.stream()
                                                                          .filter(s -> !s.isMarkedSuspect() && s.isRepaired())
-                                                                         .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+                                                                         .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
 
             Map<Integer, List<SSTableReader>> unrepairedSSTables = sstables.stream()
                                                                            .filter(s -> !s.isMarkedSuspect() && !s.isRepaired())
-                                                                           .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+                                                                           .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
 
 
             for (Map.Entry<Integer, List<SSTableReader>> group : repairedSSTables.entrySet())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 0007e30..b1f2e9f 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -98,7 +98,7 @@ public class Scrubber implements Closeable
 
         List<SSTableReader> toScrub = Collections.singletonList(sstable);
 
-        int locIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), sstable);
+        int locIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, sstable);
         this.destination = cfs.getDirectories().getLocationForDisk(cfs.getDirectories().getWriteableLocations()[locIndex]);
         this.isCommutative = cfs.metadata.isCounter();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index 205aebe..d2f816b 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.DiskBoundaries;
 import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.compaction.CompactionTask;
@@ -38,7 +39,6 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.Transactional;
 import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.service.StorageService;
 
 
 /**
@@ -58,7 +58,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
 
     protected final SSTableRewriter sstableWriter;
     protected final LifecycleTransaction txn;
-    private final Directories.DataDirectory[] locations;
+    private final List<Directories.DataDirectory> locations;
     private final List<PartitionPosition> diskBoundaries;
     private int locationIndex;
 
@@ -88,8 +88,9 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
         maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
         sstableWriter = SSTableRewriter.construct(cfs, txn, keepOriginals, maxAge);
         minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
-        locations = cfs.getDirectories().getWriteableLocations();
-        diskBoundaries = StorageService.getDiskBoundaries(cfs);
+        DiskBoundaries db = cfs.getDiskBoundaries();
+        diskBoundaries = db.positions;
+        locations = db.directories;
         locationIndex = -1;
     }
 
@@ -174,8 +175,8 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
         while (locationIndex == -1 || key.compareTo(diskBoundaries.get(locationIndex)) > 0)
             locationIndex++;
         if (prevIdx >= 0)
-            logger.debug("Switching write location from {} to {}", locations[prevIdx], locations[locationIndex]);
-        switchCompactionLocation(locations[locationIndex]);
+            logger.debug("Switching write location from {} to {}", locations.get(prevIdx), locations.get(locationIndex));
+        switchCompactionLocation(locations.get(locationIndex));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
index 3665da7..353aacb 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
@@ -26,19 +26,19 @@ import java.util.UUID;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.DiskBoundaries;
 import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
 public class RangeAwareSSTableWriter implements SSTableMultiWriter
 {
     private final List<PartitionPosition> boundaries;
-    private final Directories.DataDirectory[] directories;
+    private final List<Directories.DataDirectory> directories;
     private final int sstableLevel;
     private final long estimatedKeys;
     private final long repairedAt;
@@ -53,15 +53,16 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter
 
     public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader header) throws IOException
     {
-        directories = cfs.getDirectories().getWriteableLocations();
+        DiskBoundaries db = cfs.getDiskBoundaries();
+        directories = db.directories;
         this.sstableLevel = sstableLevel;
         this.cfs = cfs;
-        this.estimatedKeys = estimatedKeys / directories.length;
+        this.estimatedKeys = estimatedKeys / directories.size();
         this.repairedAt = repairedAt;
         this.format = format;
         this.txn = txn;
         this.header = header;
-        boundaries = StorageService.getDiskBoundaries(cfs, directories);
+        boundaries = db.positions;
         if (boundaries == null)
         {
             Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
@@ -90,7 +91,7 @@ public class RangeAwareSSTableWriter implements SSTableMultiWriter
             if (currentWriter != null)
                 finishedWriters.add(currentWriter);
 
-            Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(directories[currentIndex])), format);
+            Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(directories.get(currentIndex))), format);
             currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header, txn);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 5b4e552..e93430b 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -5205,62 +5205,4 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         DatabaseDescriptor.setHintedHandoffThrottleInKB(throttleInKB);
         logger.info("Updated hinted_handoff_throttle_in_kb to {}", throttleInKB);
     }
-
-    public static List<PartitionPosition> getDiskBoundaries(ColumnFamilyStore cfs, Directories.DataDirectory[] directories)
-    {
-        if (!cfs.getPartitioner().splitter().isPresent())
-            return null;
-
-        Collection<Range<Token>> lr;
-
-        if (StorageService.instance.isBootstrapMode())
-        {
-            lr = StorageService.instance.getTokenMetadata().getPendingRanges(cfs.keyspace.getName(), FBUtilities.getBroadcastAddress());
-        }
-        else
-        {
-            // Reason we use use the future settled TMD is that if we decommission a node, we want to stream
-            // from that node to the correct location on disk, if we didn't, we would put new files in the wrong places.
-            // We do this to minimize the amount of data we need to move in rebalancedisks once everything settled
-            TokenMetadata tmd = StorageService.instance.getTokenMetadata().cloneAfterAllSettled();
-            lr = cfs.keyspace.getReplicationStrategy().getAddressRanges(tmd).get(FBUtilities.getBroadcastAddress());
-        }
-
-        if (lr == null || lr.isEmpty())
-            return null;
-        List<Range<Token>> localRanges = Range.sort(lr);
-
-        return getDiskBoundaries(localRanges, cfs.getPartitioner(), directories);
-    }
-
-    public static List<PartitionPosition> getDiskBoundaries(ColumnFamilyStore cfs)
-    {
-        return getDiskBoundaries(cfs, cfs.getDirectories().getWriteableLocations());
-    }
-
-    /**
-     * Returns a list of disk boundaries, the result will differ depending on whether vnodes are enabled or not.
-     *
-     * What is returned are upper bounds for the disks, meaning everything from partitioner.minToken up to
-     * getDiskBoundaries(..).get(0) should be on the first disk, everything between 0 to 1 should be on the second disk
-     * etc.
-     *
-     * The final entry in the returned list will always be the partitioner maximum tokens upper key bound
-     */
-    public static List<PartitionPosition> getDiskBoundaries(List<Range<Token>> localRanges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories)
-    {
-        assert partitioner.splitter().isPresent();
-        Splitter splitter = partitioner.splitter().get();
-        boolean dontSplitRanges = DatabaseDescriptor.getNumTokens() > 1;
-        List<Token> boundaries = splitter.splitOwnedRanges(dataDirectories.length, localRanges, dontSplitRanges);
-        // If we can't split by ranges, split evenly to ensure utilisation of all disks
-        if (dontSplitRanges && boundaries.size() < dataDirectories.length)
-            boundaries = splitter.splitOwnedRanges(dataDirectories.length, localRanges, false);
-
-        List<PartitionPosition> diskBoundaries = new ArrayList<>();
-        for (int i = 0; i < boundaries.size() - 1; i++)
-            diskBoundaries.add(boundaries.get(i).maxKeyBound());
-        diskBoundaries.add(partitioner.getMaximumToken().maxKeyBound());
-        return diskBoundaries;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/14e46e46/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java b/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
new file mode 100644
index 0000000..de79959
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.dht.BootStrapper;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class DiskBoundaryManagerTest extends CQLTester
+{
+    private DiskBoundaryManager dbm;
+    private MockCFS mock;
+    private Directories dirs;
+
+    @Before
+    public void setup()
+    {
+        BlacklistedDirectories.clearUnwritableUnsafe();
+        TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+        metadata.updateNormalTokens(BootStrapper.getRandomTokens(metadata, 10), FBUtilities.getBroadcastAddress());
+        createTable("create table %s (id int primary key, x text)");
+        dbm = getCurrentColumnFamilyStore().diskBoundaryManager;
+        dirs = new Directories(getCurrentColumnFamilyStore().metadata, Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")),
+                                                                                          new Directories.DataDirectory(new File("/tmp/2")),
+                                                                                          new Directories.DataDirectory(new File("/tmp/3"))));
+        mock = new MockCFS(getCurrentColumnFamilyStore(), dirs);
+    }
+
+    @Test
+    public void getBoundariesTest()
+    {
+        DiskBoundaries dbv = dbm.getDiskBoundaries(mock);
+        Assert.assertEquals(3, dbv.positions.size());
+        assertEquals(dbv.directories, dirs.getWriteableLocations());
+    }
+
+    @Test
+    public void blackListTest()
+    {
+        DiskBoundaries dbv = dbm.getDiskBoundaries(mock);
+        Assert.assertEquals(3, dbv.positions.size());
+        assertEquals(dbv.directories, dirs.getWriteableLocations());
+        BlacklistedDirectories.maybeMarkUnwritable(new File("/tmp/3"));
+        dbv = dbm.getDiskBoundaries(mock);
+        Assert.assertEquals(2, dbv.positions.size());
+        Assert.assertEquals(Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")),
+                                        new Directories.DataDirectory(new File("/tmp/2"))),
+                                 dbv.directories);
+    }
+
+    @Test
+    public void updateTokensTest() throws UnknownHostException
+    {
+        DiskBoundaries dbv1 = dbm.getDiskBoundaries(mock);
+        StorageService.instance.getTokenMetadata().updateNormalTokens(BootStrapper.getRandomTokens(StorageService.instance.getTokenMetadata(), 10), InetAddress.getByName("127.0.0.10"));
+        DiskBoundaries dbv2 = dbm.getDiskBoundaries(mock);
+        assertFalse(dbv1.equals(dbv2));
+    }
+
+    @Test
+    public void alterKeyspaceTest() throws Throwable
+    {
+        DiskBoundaries dbv1 = dbm.getDiskBoundaries(mock);
+        execute("alter keyspace "+keyspace()+" with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 }");
+        DiskBoundaries dbv2 = dbm.getDiskBoundaries(mock);
+        // == on purpose - we just want to make sure that there is a new instance cached
+        assertFalse(dbv1 == dbv2);
+        DiskBoundaries dbv3 = dbm.getDiskBoundaries(mock);
+        assertTrue(dbv2 == dbv3);
+
+    }
+
+    private static void assertEquals(List<Directories.DataDirectory> dir1, Directories.DataDirectory[] dir2)
+    {
+        if (dir1.size() != dir2.length)
+            fail();
+        for (int i = 0; i < dir2.length; i++)
+        {
+            if (!dir1.get(i).equals(dir2[i]))
+                fail();
+        }
+    }
+
+    // just to be able to override the data directories
+    private static class MockCFS extends ColumnFamilyStore
+    {
+        MockCFS(ColumnFamilyStore cfs, Directories dirs)
+        {
+            super(cfs.keyspace, cfs.getTableName(), 0, cfs.metadata, dirs, false, false, true);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org