You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2019/11/29 09:14:27 UTC

[cassandra] branch cassandra-3.11 updated (5d930cc -> df4180f)

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a change to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 5d930cc  Fix SELECT JSON formatting for the "duration" type
     new eda5db2  Fix various data directory prefix matching issues
     new df4180f  Merge branch 'cassandra-3.0' into cassandra-3.11

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/db/Directories.java  |  50 ++++---
 .../org/apache/cassandra/db/DiskBoundaries.java    |  11 +-
 .../apache/cassandra/db/DiskBoundaryManager.java   |   6 +-
 .../cassandra/db/compaction/CompactionManager.java |   4 +-
 .../compaction/writers/CompactionAwareWriter.java  |  15 ++-
 .../org/apache/cassandra/io/util/FileUtils.java    |   4 +-
 .../org/apache/cassandra/db/DirectoriesTest.java   | 149 +++++++++++++++++++++
 .../compaction/CompactionStrategyManagerTest.java  |   2 +-
 .../apache/cassandra/io/util/FileUtilsTest.java    |  11 ++
 10 files changed, 214 insertions(+), 39 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit df4180f5782ce7530dc30448a17a4f4dbe4fae44
Merge: 5d930cc eda5db2
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Fri Nov 29 09:53:37 2019 +0100

    Merge branch 'cassandra-3.0' into cassandra-3.11

 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/db/Directories.java  |  50 ++++---
 .../org/apache/cassandra/db/DiskBoundaries.java    |  11 +-
 .../apache/cassandra/db/DiskBoundaryManager.java   |   6 +-
 .../cassandra/db/compaction/CompactionManager.java |   4 +-
 .../compaction/writers/CompactionAwareWriter.java  |  15 ++-
 .../org/apache/cassandra/io/util/FileUtils.java    |   4 +-
 .../org/apache/cassandra/db/DirectoriesTest.java   | 149 +++++++++++++++++++++
 .../compaction/CompactionStrategyManagerTest.java  |   2 +-
 .../apache/cassandra/io/util/FileUtilsTest.java    |  11 ++
 10 files changed, 214 insertions(+), 39 deletions(-)

diff --cc CHANGES.txt
index 918eb1b,cdebc03..3ede6a4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,7 -1,5 +1,8 @@@
 -3.0.20
 +3.11.6
 + * Fix SELECT JSON formatting for the "duration" type (CASSANDRA-15075)
 + * Fix LegacyLayout to have same behavior as 2.x when handling unknown column names (CASSANDRA-15081)
 +Merged from 3.0:
+  * Fix various data directory prefix matching issues (CASSANDRA-13974)
   * Minimize clustering values in metadata collector (CASSANDRA-15400)
   * Avoid over-trimming of results in mixed mode clusters (CASSANDRA-15405)
   * validate value sizes in LegacyLayout (CASSANDRA-15373)
diff --cc src/java/org/apache/cassandra/db/Directories.java
index 532bf98,b104509..4731b3e
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@@ -21,11 -23,17 +21,12 @@@ import java.io.File
  import java.io.FileFilter;
  import java.io.IOError;
  import java.io.IOException;
 -import java.nio.file.FileVisitResult;
  import java.nio.file.Files;
  import java.nio.file.Path;
+ import java.nio.file.Paths;
 -import java.nio.file.SimpleFileVisitor;
 -import java.nio.file.attribute.BasicFileAttributes;
  import java.util.*;
  import java.util.concurrent.ThreadLocalRandom;
 -import java.util.concurrent.atomic.AtomicLong;
  import java.util.function.BiFunction;
 -import java.util.function.Consumer;
  
  import com.google.common.annotations.VisibleForTesting;
  import com.google.common.base.Predicate;
@@@ -177,6 -185,6 +178,7 @@@ public class Directorie
      private final CFMetaData metadata;
      private final DataDirectory[] paths;
      private final File[] dataPaths;
++    private final ImmutableMap<Path, DataDirectory> canonicalPathToDD;
  
      public Directories(final CFMetaData metadata)
      {
@@@ -199,6 -201,6 +201,8 @@@
          this.metadata = metadata;
          this.paths = paths;
  
++        ImmutableMap.Builder<Path, DataDirectory> canonicalPathsBuilder = ImmutableMap.builder();
++
          String cfId = ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(metadata.cfId));
          int idx = metadata.cfName.indexOf(SECONDARY_INDEX_NAME_SEPARATOR);
          String cfName = idx >= 0 ? metadata.cfName.substring(0, idx) : metadata.cfName;
@@@ -210,27 -212,27 +214,33 @@@
          for (int i = 0; i < paths.length; ++i)
          {
              // check if old SSTable directory exists
--            dataPaths[i] = new File(paths[i].location, oldSSTableRelativePath);
++            File dataPath = new File(paths[i].location, oldSSTableRelativePath);
++            dataPaths[i] = dataPath;
++            canonicalPathsBuilder.put(Paths.get(FileUtils.getCanonicalPath(dataPath)), paths[i]);
          }
--        boolean olderDirectoryExists = Iterables.any(Arrays.asList(dataPaths), new Predicate<File>()
--        {
--            public boolean apply(File file)
--            {
--                return file.exists();
--            }
--        });
++        boolean olderDirectoryExists = Iterables.any(Arrays.asList(dataPaths), File::exists);
          if (!olderDirectoryExists)
          {
++            canonicalPathsBuilder = ImmutableMap.builder();
              // use 2.1+ style
              String newSSTableRelativePath = join(metadata.ksName, cfName + '-' + cfId);
              for (int i = 0; i < paths.length; ++i)
--                dataPaths[i] = new File(paths[i].location, newSSTableRelativePath);
++            {
++                File dataPath = new File(paths[i].location, newSSTableRelativePath);;
++                dataPaths[i] = dataPath;
++                canonicalPathsBuilder.put(Paths.get(FileUtils.getCanonicalPath(dataPath)), paths[i]);
++            }
          }
          // if index, then move to its own directory
          if (indexNameWithDot != null)
          {
++            canonicalPathsBuilder = ImmutableMap.builder();
              for (int i = 0; i < paths.length; ++i)
--                dataPaths[i] = new File(dataPaths[i], indexNameWithDot);
++            {
++                File dataPath = new File(dataPaths[i], indexNameWithDot);
++                dataPaths[i] = dataPath;
++                canonicalPathsBuilder.put(Paths.get(FileUtils.getCanonicalPath(dataPath)), paths[i]);
++            }
          }
  
          for (File dir : dataPaths)
@@@ -274,6 -276,6 +284,7 @@@
                  }
              }
          }
++        canonicalPathToDD = canonicalPathsBuilder.build();
      }
  
      /**
@@@ -291,19 -298,6 +307,13 @@@
          return null;
      }
  
-     public DataDirectory getDataDirectoryForFile(File directory)
++    public DataDirectory getDataDirectoryForFile(Descriptor descriptor)
 +    {
-         if (directory != null)
-         {
-             for (DataDirectory dataDirectory : paths)
-             {
-                 if (directory.getAbsolutePath().startsWith(dataDirectory.location.getAbsolutePath()))
-                     return dataDirectory;
-             }
-         }
++        if (descriptor != null)
++            return canonicalPathToDD.get(descriptor.directory.toPath());
 +        return null;
 +    }
 +
      public Descriptor find(String filename)
      {
          for (File dir : dataPaths)
diff --cc src/java/org/apache/cassandra/db/DiskBoundaries.java
index 7bfed28,0000000..90af893
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaries.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaries.java
@@@ -1,131 -1,0 +1,134 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db;
 +
 +import java.util.Collections;
 +import java.util.List;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.ImmutableList;
 +
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.service.StorageService;
 +
 +public class DiskBoundaries
 +{
 +    public final List<Directories.DataDirectory> directories;
 +    public final ImmutableList<PartitionPosition> positions;
 +    final long ringVersion;
 +    final int directoriesVersion;
++    private final ColumnFamilyStore cfs;
 +    private volatile boolean isInvalid = false;
 +
-     public DiskBoundaries(Directories.DataDirectory[] directories, int diskVersion)
++    public DiskBoundaries(ColumnFamilyStore cfs, Directories.DataDirectory[] directories, int diskVersion)
 +    {
-         this(directories, null, -1, diskVersion);
++        this(cfs, directories, null, -1, diskVersion);
 +    }
 +
 +    @VisibleForTesting
-     public DiskBoundaries(Directories.DataDirectory[] directories, List<PartitionPosition> positions, long ringVersion, int diskVersion)
++    public DiskBoundaries(ColumnFamilyStore cfs, Directories.DataDirectory[] directories, List<PartitionPosition> positions, long ringVersion, int diskVersion)
 +    {
 +        this.directories = directories == null ? null : ImmutableList.copyOf(directories);
 +        this.positions = positions == null ? null : ImmutableList.copyOf(positions);
 +        this.ringVersion = ringVersion;
 +        this.directoriesVersion = diskVersion;
++        this.cfs = cfs;
 +    }
 +
 +    public boolean equals(Object o)
 +    {
 +        if (this == o) return true;
 +        if (o == null || getClass() != o.getClass()) return false;
 +
 +        DiskBoundaries that = (DiskBoundaries) o;
 +
 +        if (ringVersion != that.ringVersion) return false;
 +        if (directoriesVersion != that.directoriesVersion) return false;
 +        if (!directories.equals(that.directories)) return false;
 +        return positions != null ? positions.equals(that.positions) : that.positions == null;
 +    }
 +
 +    public int hashCode()
 +    {
 +        int result = directories != null ? directories.hashCode() : 0;
 +        result = 31 * result + (positions != null ? positions.hashCode() : 0);
 +        result = 31 * result + (int) (ringVersion ^ (ringVersion >>> 32));
 +        result = 31 * result + directoriesVersion;
 +        return result;
 +    }
 +
 +    public String toString()
 +    {
 +        return "DiskBoundaries{" +
 +               "directories=" + directories +
 +               ", positions=" + positions +
 +               ", ringVersion=" + ringVersion +
 +               ", directoriesVersion=" + directoriesVersion +
 +               '}';
 +    }
 +
 +    /**
 +     * check if the given disk boundaries are out of date due not being set or to having too old diskVersion/ringVersion
 +     */
 +    public boolean isOutOfDate()
 +    {
 +        if (isInvalid)
 +            return true;
 +        int currentDiskVersion = BlacklistedDirectories.getDirectoriesVersion();
 +        long currentRingVersion = StorageService.instance.getTokenMetadata().getRingVersion();
 +        return currentDiskVersion != directoriesVersion || (ringVersion != -1 && currentRingVersion != ringVersion);
 +    }
 +
 +    public void invalidate()
 +    {
 +        this.isInvalid = true;
 +    }
 +
 +    public int getDiskIndex(SSTableReader sstable)
 +    {
 +        if (positions == null)
 +        {
 +            return getBoundariesFromSSTableDirectory(sstable);
 +        }
 +
 +        int pos = Collections.binarySearch(positions, sstable.first);
 +        assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal
 +        return -pos - 1;
 +    }
 +
 +    /**
 +     * Try to figure out location based on sstable directory
 +     */
 +    private int getBoundariesFromSSTableDirectory(SSTableReader sstable)
 +    {
++        Directories.DataDirectory actualDirectory = cfs.getDirectories().getDataDirectoryForFile(sstable.descriptor);
 +        for (int i = 0; i < directories.size(); i++)
 +        {
 +            Directories.DataDirectory directory = directories.get(i);
-             if (sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
++            if (actualDirectory != null && actualDirectory.equals(directory))
 +                return i;
 +        }
 +        return 0;
 +    }
 +
 +    public Directories.DataDirectory getCorrectDiskForSSTable(SSTableReader sstable)
 +    {
 +        return directories.get(getDiskIndex(sstable));
 +    }
 +}
diff --cc src/java/org/apache/cassandra/db/DiskBoundaryManager.java
index 03cbf7b,0000000..61febe9
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
@@@ -1,140 -1,0 +1,140 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db;
 +
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.List;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Splitter;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.locator.TokenMetadata;
 +import org.apache.cassandra.service.PendingRangeCalculatorService;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +public class DiskBoundaryManager
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(DiskBoundaryManager.class);
 +    private volatile DiskBoundaries diskBoundaries;
 +
 +    public DiskBoundaries getDiskBoundaries(ColumnFamilyStore cfs)
 +    {
 +        if (!cfs.getPartitioner().splitter().isPresent())
-             return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), BlacklistedDirectories.getDirectoriesVersion());
++            return new DiskBoundaries(cfs, cfs.getDirectories().getWriteableLocations(), BlacklistedDirectories.getDirectoriesVersion());
 +        if (diskBoundaries == null || diskBoundaries.isOutOfDate())
 +        {
 +            synchronized (this)
 +            {
 +                if (diskBoundaries == null || diskBoundaries.isOutOfDate())
 +                {
 +                    logger.debug("Refreshing disk boundary cache for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
 +                    DiskBoundaries oldBoundaries = diskBoundaries;
 +                    diskBoundaries = getDiskBoundaryValue(cfs);
 +                    logger.debug("Updating boundaries from {} to {} for {}.{}", oldBoundaries, diskBoundaries, cfs.keyspace.getName(), cfs.getTableName());
 +                }
 +            }
 +        }
 +        return diskBoundaries;
 +    }
 +
 +    public void invalidate()
 +    {
 +       if (diskBoundaries != null)
 +           diskBoundaries.invalidate();
 +    }
 +
 +    private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs)
 +    {
 +        Collection<Range<Token>> localRanges;
 +
 +        long ringVersion;
 +        TokenMetadata tmd;
 +        do
 +        {
 +            tmd = StorageService.instance.getTokenMetadata();
 +            ringVersion = tmd.getRingVersion();
 +            if (StorageService.instance.isBootstrapMode()
 +                && !StorageService.isReplacingSameAddress()) // When replacing same address, the node marks itself as UN locally
 +            {
 +                PendingRangeCalculatorService.instance.blockUntilFinished();
 +                localRanges = tmd.getPendingRanges(cfs.keyspace.getName(), FBUtilities.getBroadcastAddress());
 +            }
 +            else
 +            {
 +                // Reason we use use the future settled TMD is that if we decommission a node, we want to stream
 +                // from that node to the correct location on disk, if we didn't, we would put new files in the wrong places.
 +                // We do this to minimize the amount of data we need to move in rebalancedisks once everything settled
 +                localRanges = cfs.keyspace.getReplicationStrategy().getAddressRanges(tmd.cloneAfterAllSettled()).get(FBUtilities.getBroadcastAddress());
 +            }
 +            logger.debug("Got local ranges {} (ringVersion = {})", localRanges, ringVersion);
 +        }
 +        while (ringVersion != tmd.getRingVersion()); // if ringVersion is different here it means that
 +                                                     // it might have changed before we calculated localRanges - recalculate
 +
 +        int directoriesVersion;
 +        Directories.DataDirectory[] dirs;
 +        do
 +        {
 +            directoriesVersion = BlacklistedDirectories.getDirectoriesVersion();
 +            dirs = cfs.getDirectories().getWriteableLocations();
 +        }
 +        while (directoriesVersion != BlacklistedDirectories.getDirectoriesVersion()); // if directoriesVersion has changed we need to recalculate
 +
 +        if (localRanges == null || localRanges.isEmpty())
-             return new DiskBoundaries(dirs, null, ringVersion, directoriesVersion);
++            return new DiskBoundaries(cfs, dirs, null, ringVersion, directoriesVersion);
 +
 +        List<Range<Token>> sortedLocalRanges = Range.sort(localRanges);
 +
 +        List<PartitionPosition> positions = getDiskBoundaries(sortedLocalRanges, cfs.getPartitioner(), dirs);
-         return new DiskBoundaries(dirs, positions, ringVersion, directoriesVersion);
++        return new DiskBoundaries(cfs, dirs, positions, ringVersion, directoriesVersion);
 +    }
 +
 +    /**
 +     * Returns a list of disk boundaries, the result will differ depending on whether vnodes are enabled or not.
 +     *
 +     * What is returned are upper bounds for the disks, meaning everything from partitioner.minToken up to
 +     * getDiskBoundaries(..).get(0) should be on the first disk, everything between 0 to 1 should be on the second disk
 +     * etc.
 +     *
 +     * The final entry in the returned list will always be the partitioner maximum tokens upper key bound
 +     */
 +    private static List<PartitionPosition> getDiskBoundaries(List<Range<Token>> sortedLocalRanges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories)
 +    {
 +        assert partitioner.splitter().isPresent();
 +        Splitter splitter = partitioner.splitter().get();
 +        boolean dontSplitRanges = DatabaseDescriptor.getNumTokens() > 1;
 +        List<Token> boundaries = splitter.splitOwnedRanges(dataDirectories.length, sortedLocalRanges, dontSplitRanges);
 +        // If we can't split by ranges, split evenly to ensure utilisation of all disks
 +        if (dontSplitRanges && boundaries.size() < dataDirectories.length)
 +            boundaries = splitter.splitOwnedRanges(dataDirectories.length, sortedLocalRanges, false);
 +
 +        List<PartitionPosition> diskBoundaries = new ArrayList<>();
 +        for (int i = 0; i < boundaries.size() - 1; i++)
 +            diskBoundaries.add(boundaries.get(i).maxKeyBound());
 +        diskBoundaries.add(partitioner.getMaximumToken().maxKeyBound());
 +        return diskBoundaries;
 +    }
 +}
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index a08d08b,2b9ee50..56d2d29
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -503,115 -499,6 +503,115 @@@ public class CompactionManager implemen
          }, jobs, OperationType.CLEANUP);
      }
  
 +    public AllSSTableOpStatus performGarbageCollection(final ColumnFamilyStore cfStore, TombstoneOption tombstoneOption, int jobs) throws InterruptedException, ExecutionException
 +    {
 +        assert !cfStore.isIndex();
 +
 +        return parallelAllSSTableOperation(cfStore, new OneSSTableOperation()
 +        {
 +            @Override
 +            public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
 +            {
 +                Iterable<SSTableReader> originals = transaction.originals();
 +                if (cfStore.getCompactionStrategyManager().onlyPurgeRepairedTombstones())
 +                    originals = Iterables.filter(originals, SSTableReader::isRepaired);
 +                List<SSTableReader> sortedSSTables = Lists.newArrayList(originals);
 +                Collections.sort(sortedSSTables, SSTableReader.maxTimestampAscending);
 +                return sortedSSTables;
 +            }
 +
 +            @Override
 +            public void execute(LifecycleTransaction txn) throws IOException
 +            {
 +                logger.debug("Garbage collecting {}", txn.originals());
 +                CompactionTask task = new CompactionTask(cfStore, txn, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()))
 +                {
 +                    @Override
 +                    protected CompactionController getCompactionController(Set<SSTableReader> toCompact)
 +                    {
 +                        return new CompactionController(cfStore, toCompact, gcBefore, null, tombstoneOption);
 +                    }
 +                };
 +                task.setUserDefined(true);
 +                task.setCompactionType(OperationType.GARBAGE_COLLECT);
 +                task.execute(metrics);
 +            }
 +        }, jobs, OperationType.GARBAGE_COLLECT);
 +    }
 +
 +    public AllSSTableOpStatus relocateSSTables(final ColumnFamilyStore cfs, int jobs) throws ExecutionException, InterruptedException
 +    {
 +        if (!cfs.getPartitioner().splitter().isPresent())
 +        {
 +            logger.info("Partitioner does not support splitting");
 +            return AllSSTableOpStatus.ABORTED;
 +        }
 +        final Collection<Range<Token>> r = StorageService.instance.getLocalRanges(cfs.keyspace.getName());
 +
 +        if (r.isEmpty())
 +        {
 +            logger.info("Relocate cannot run before a node has joined the ring");
 +            return AllSSTableOpStatus.ABORTED;
 +        }
 +
 +        final DiskBoundaries diskBoundaries = cfs.getDiskBoundaries();
 +
 +        return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
 +        {
 +            @Override
 +            public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
 +            {
 +                Set<SSTableReader> originals = Sets.newHashSet(transaction.originals());
 +                Set<SSTableReader> needsRelocation = originals.stream().filter(s -> !inCorrectLocation(s)).collect(Collectors.toSet());
 +                transaction.cancel(Sets.difference(originals, needsRelocation));
 +
 +                Map<Integer, List<SSTableReader>> groupedByDisk = groupByDiskIndex(needsRelocation);
 +
 +                int maxSize = 0;
 +                for (List<SSTableReader> diskSSTables : groupedByDisk.values())
 +                    maxSize = Math.max(maxSize, diskSSTables.size());
 +
 +                List<SSTableReader> mixedSSTables = new ArrayList<>();
 +
 +                for (int i = 0; i < maxSize; i++)
 +                    for (List<SSTableReader> diskSSTables : groupedByDisk.values())
 +                        if (i < diskSSTables.size())
 +                            mixedSSTables.add(diskSSTables.get(i));
 +
 +                return mixedSSTables;
 +            }
 +
 +            public Map<Integer, List<SSTableReader>> groupByDiskIndex(Set<SSTableReader> needsRelocation)
 +            {
 +                return needsRelocation.stream().collect(Collectors.groupingBy((s) -> diskBoundaries.getDiskIndex(s)));
 +            }
 +
 +            private boolean inCorrectLocation(SSTableReader sstable)
 +            {
 +                if (!cfs.getPartitioner().splitter().isPresent())
 +                    return true;
 +
 +                int diskIndex = diskBoundaries.getDiskIndex(sstable);
-                 File diskLocation = diskBoundaries.directories.get(diskIndex).location;
 +                PartitionPosition diskLast = diskBoundaries.positions.get(diskIndex);
 +
 +                // the location we get from directoryIndex is based on the first key in the sstable
 +                // now we need to make sure the last key is less than the boundary as well:
-                 return sstable.descriptor.directory.getAbsolutePath().startsWith(diskLocation.getAbsolutePath()) && sstable.last.compareTo(diskLast) <= 0;
++                Directories.DataDirectory dataDirectory = cfs.getDirectories().getDataDirectoryForFile(sstable.descriptor);
++                return diskBoundaries.directories.get(diskIndex).equals(dataDirectory) && sstable.last.compareTo(diskLast) <= 0;
 +            }
 +
 +            @Override
 +            public void execute(LifecycleTransaction txn)
 +            {
 +                logger.debug("Relocating {}", txn.originals());
 +                AbstractCompactionTask task = cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, Long.MAX_VALUE);
 +                task.setUserDefined(true);
 +                task.setCompactionType(OperationType.RELOCATE);
 +                task.execute(metrics);
 +            }
 +        }, jobs, OperationType.RELOCATE);
 +    }
 +
      /**
       * Submit anti-compactions for a collection of SSTables over a set of repaired ranges and marks corresponding SSTables
       * as repaired.
diff --cc src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index d2f816b,d33d72c..bc6115e
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@@ -34,11 -27,9 +34,12 @@@ import org.apache.cassandra.db.Partitio
  import org.apache.cassandra.db.rows.UnfilteredRowIterator;
  import org.apache.cassandra.db.compaction.CompactionTask;
  import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
++import org.apache.cassandra.io.sstable.Descriptor;
  import org.apache.cassandra.io.sstable.SSTableRewriter;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.concurrent.Transactional;
 +import org.apache.cassandra.db.compaction.OperationType;
  
  
  /**
@@@ -197,45 -147,13 +198,45 @@@ public abstract class CompactionAwareWr
  
      /**
       * Return a directory where we can expect expectedWriteSize to fit.
 +     *
 +     * @param sstables the sstables to compact
 +     * @return
       */
 -    public Directories.DataDirectory getWriteDirectory(long expectedWriteSize)
 +    public Directories.DataDirectory getWriteDirectory(Iterable<SSTableReader> sstables, long estimatedWriteSize)
      {
-         File directory = null;
 -        Directories.DataDirectory directory = getDirectories().getWriteableLocation(expectedWriteSize);
 -        if (directory == null)
 -            throw new RuntimeException("Insufficient disk space to write " + expectedWriteSize + " bytes");
++        Descriptor descriptor = null;
 +        for (SSTableReader sstable : sstables)
 +        {
-             if (directory == null)
-                 directory = sstable.descriptor.directory;
-             if (!directory.equals(sstable.descriptor.directory))
++            if (descriptor == null)
++                descriptor = sstable.descriptor;
++            if (!descriptor.directory.equals(sstable.descriptor.directory))
 +            {
-                 logger.trace("All sstables not from the same disk - putting results in {}", directory);
++                logger.trace("All sstables not from the same disk - putting results in {}", descriptor.directory);
 +                break;
 +            }
 +        }
-         Directories.DataDirectory d = getDirectories().getDataDirectoryForFile(directory);
++        Directories.DataDirectory d = getDirectories().getDataDirectoryForFile(descriptor);
 +        if (d != null)
 +        {
 +            long availableSpace = d.getAvailableSpace();
 +            if (availableSpace < estimatedWriteSize)
 +                throw new RuntimeException(String.format("Not enough space to write %s to %s (%s available)",
 +                                                         FBUtilities.prettyPrintMemory(estimatedWriteSize),
 +                                                         d.location,
 +                                                         FBUtilities.prettyPrintMemory(availableSpace)));
-             logger.trace("putting compaction results in {}", directory);
++            logger.trace("putting compaction results in {}", descriptor.directory);
 +            return d;
 +        }
 +        d = getDirectories().getWriteableLocation(estimatedWriteSize);
 +        if (d == null)
 +            throw new RuntimeException(String.format("Not enough disk space to store %s",
 +                                                     FBUtilities.prettyPrintMemory(estimatedWriteSize)));
 +        return d;
 +    }
  
 -        return directory;
 +    public CompactionAwareWriter setRepairedAt(long repairedAt)
 +    {
 +        this.sstableWriter.setRepairedAt(repairedAt);
 +        return this;
      }
  }
diff --cc test/unit/org/apache/cassandra/db/DirectoriesTest.java
index 7e800e6,e4268f6..24e03f6
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@@ -25,6 -28,6 +28,7 @@@ import java.util.concurrent.Callable
  import java.util.concurrent.Executors;
  import java.util.concurrent.Future;
  
++import com.google.common.collect.Sets;
  import org.apache.commons.lang3.StringUtils;
  import org.junit.AfterClass;
  import org.junit.BeforeClass;
@@@ -471,6 -470,46 +475,150 @@@ public class DirectoriesTes
          }
      }
  
+     @Test
+     public void testGetLocationForDisk()
+     {
 -        DataDirectory [] paths = new DataDirectory[3];
 -        paths[0] = new DataDirectory(new File("/tmp/aaa"));
 -        paths[1] = new DataDirectory(new File("/tmp/aa"));
 -        paths[2] = new DataDirectory(new File("/tmp/a"));
++        Collection<DataDirectory> paths = new ArrayList<>();
++        paths.add(new DataDirectory(new File("/tmp/aaa")));
++        paths.add(new DataDirectory(new File("/tmp/aa")));
++        paths.add(new DataDirectory(new File("/tmp/a")));
+ 
+         for (CFMetaData cfm : CFM)
+         {
+             Directories dirs = new Directories(cfm, paths);
+             for (DataDirectory dir : paths)
+             {
+                 String p = dirs.getLocationForDisk(dir).getAbsolutePath() + File.separator;
+                 assertTrue(p.startsWith(dir.location.getAbsolutePath() + File.separator));
+             }
+         }
+     }
+ 
+     @Test
+     public void testGetLocationWithSymlinks() throws IOException
+     {
+         Path p = Files.createTempDirectory("something");
+         Path symlinktarget = Files.createDirectories(p.resolve("symlinktarget"));
+         Path ddir = Files.createDirectories(p.resolve("datadir1"));
+ 
+         Path p1 = Files.createDirectories(ddir.resolve("p1").resolve("ks")).getParent(); // the data dir does not include the keyspace dir
+         Path p2 = Files.createDirectories(ddir.resolve("p2"));
+         Path l1 = Files.createSymbolicLink(p2.resolve("ks"), symlinktarget);
+ 
+         DataDirectory path1 = new DataDirectory(p1.toFile());
+         DataDirectory path2 = new DataDirectory(p2.toFile());
+         Directories dirs = new Directories(CFM.iterator().next(), new DataDirectory[] {path1, path2});
+         dirs.getLocationForDisk(new DataDirectory(p1.toFile()));
+         dirs.getLocationForDisk(new DataDirectory(p2.toFile()));
+ 
+         assertTrue(dirs.getLocationForDisk(path2).toPath().startsWith(l1));
+         assertTrue(dirs.getLocationForDisk(path1).toPath().startsWith(p1));
+     }
+ 
++    @Test
++    public void getDataDirectoryForFile()
++    {
++        Collection<DataDirectory> paths = new ArrayList<>();
++        paths.add(new DataDirectory(new File("/tmp/a")));
++        paths.add(new DataDirectory(new File("/tmp/aa")));
++        paths.add(new DataDirectory(new File("/tmp/aaa")));
++
++        for (CFMetaData cfm : CFM)
++        {
++            Directories dirs = new Directories(cfm, paths);
++            for (DataDirectory dir : paths)
++            {
++                Descriptor d = Descriptor.fromFilename(new File(dir.location, getNewFilename(cfm, false)).toString());
++                String p = dirs.getDataDirectoryForFile(d).location.getAbsolutePath() + File.separator;
++                assertTrue(p.startsWith(dir.location.getAbsolutePath() + File.separator));
++            }
++        }
++    }
++
++    /**
++     * Makes sure we can find the data directory when it is a symlink
++     *
++     * creates the following data directories:
++     * <tempdir something>/datadir1
++     * <tempdir something>/datadir11 (symlink to <tempdir something>/symlinktarget)
++     *
++     * and then makes sure that we get the correct directory back.
++     */
++    @Test
++    public void testDirectoriesSymlinks() throws IOException
++    {
++        Path p = Files.createTempDirectory("something");
++        Path symlinktarget = Files.createDirectories(p.resolve("symlinktarget"));
++        Path ddir1 = Files.createDirectories(p.resolve("datadir1"));
++        Path ddir2 = Files.createSymbolicLink(p.resolve("datadir11"), symlinktarget);
++        DataDirectory dd1 = new DataDirectory(ddir1.toFile());
++        DataDirectory dd2 = new DataDirectory(ddir2.toFile());
++
++        for (CFMetaData tm : CFM)
++        {
++            Directories dirs = new Directories(tm, Sets.newHashSet(dd1, dd2));
++            Descriptor desc = Descriptor.fromFilename(ddir1.resolve(getNewFilename(tm, false)).toString());
++            assertEquals(ddir1.toFile(), dirs.getDataDirectoryForFile(desc).location);
++            desc = Descriptor.fromFilename(ddir2.resolve(getNewFilename(tm, false)).toString());
++            assertEquals(ddir2.toFile(), dirs.getDataDirectoryForFile(desc).location);
++        }
++    }
++
++    @Test
++    public void testDirectoriesOldTableSymlink() throws IOException
++    {
++        testDirectoriesSymlinksHelper(true);
++    }
++
++    @Test
++    public void testDirectoriesTableSymlink() throws IOException
++    {
++        testDirectoriesSymlinksHelper(false);
++    }
++
++    /**
++     * Makes sure we can find the data directory for a file when the table directory is a symlink
++     *
++     * if oldStyle is false we append the table id to the table directory
++     *
++     * creates the following structure
++     * <tempdir>/datadir1/<ks>/<table>
++     * <tempdir>/datadir11/<ks>/<table symlink to <tempdir>/symlinktarget>
++     *
++     * and then we create a fake descriptor to a file in the table directory and make sure we get the correct
++     * data directory back.
++     */
++    private void testDirectoriesSymlinksHelper(boolean oldStyle) throws IOException
++    {
++        Path p = Files.createTempDirectory("something");
++        Path symlinktarget = Files.createDirectories(p.resolve("symlinktarget"));
++        Path ddir1 = Files.createDirectories(p.resolve("datadir1"));
++        Path ddir2 = Files.createDirectories(p.resolve("datadir11"));
++
++        for (CFMetaData metadata : CFM)
++        {
++            Path keyspacedir = Files.createDirectories(ddir2.resolve(metadata.ksName));
++            String tabledir = metadata.cfName + (oldStyle ? "" : Component.separator +  ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(metadata.cfId)));
++            Files.createSymbolicLink(keyspacedir.resolve(tabledir), symlinktarget);
++        }
++
++        DataDirectory dd1 = new DataDirectory(ddir1.toFile());
++        DataDirectory dd2 = new DataDirectory(ddir2.toFile());
++        for (CFMetaData tm : CFM)
++        {
++            Directories dirs = new Directories(tm, Sets.newHashSet(dd1, dd2));
++            Descriptor desc = Descriptor.fromFilename(ddir1.resolve(getNewFilename(tm, oldStyle)).toFile().toString());
++            assertEquals(ddir1.toFile(), dirs.getDataDirectoryForFile(desc).location);
++            desc = Descriptor.fromFilename(ddir2.resolve(getNewFilename(tm, oldStyle)).toFile().toString());
++            assertEquals(ddir2.toFile(), dirs.getDataDirectoryForFile(desc).location);
++        }
++    }
++
++    private String getNewFilename(CFMetaData metadata, boolean oldStyle)
++    {
++        return metadata.ksName + File.separator + metadata.cfName + (oldStyle ? "" : Component.separator +  ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(metadata.cfId))) + "/na-1-big-Data.db";
++    }
++
      private List<Directories.DataDirectoryCandidate> getWriteableDirectories(DataDirectory[] dataDirectories, long writeSize)
      {
          // copied from Directories.getWriteableLocation(long)
@@@ -492,4 -531,4 +640,5 @@@
  
          return candidates;
      }
++
  }
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
index c654fcd,0000000..2120757
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
@@@ -1,290 -1,0 +1,290 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db.compaction;
 +
 +import java.io.File;
 +import java.util.Arrays;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.stream.Collectors;
 +
 +import com.google.common.io.Files;
 +import org.junit.AfterClass;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.Util;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.Directories;
 +import org.apache.cassandra.db.DiskBoundaries;
 +import org.apache.cassandra.db.DiskBoundaryManager;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.PartitionPosition;
 +import org.apache.cassandra.db.RowUpdateBuilder;
 +import org.apache.cassandra.dht.ByteOrderedPartitioner;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.notifications.SSTableAddedNotification;
 +import org.apache.cassandra.notifications.SSTableDeletingNotification;
 +import org.apache.cassandra.schema.CompactionParams;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +import org.apache.cassandra.service.StorageService;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertTrue;
 +
 +public class CompactionStrategyManagerTest
 +{
 +    private static final String KS_PREFIX = "Keyspace1";
 +    private static final String TABLE_PREFIX = "CF_STANDARD";
 +
 +    private static IPartitioner originalPartitioner;
 +    private static boolean backups;
 +
 +    @BeforeClass
 +    public static void beforeClass()
 +    {
 +        SchemaLoader.prepareServer();
 +        backups = DatabaseDescriptor.isIncrementalBackupsEnabled();
 +        DatabaseDescriptor.setIncrementalBackupsEnabled(false);
 +        /**
 +         * We use byte ordered partitioner in this test to be able to easily infer an SSTable
 +         * disk assignment based on its generation - See {@link this#getSSTableIndex(Integer[], SSTableReader)}
 +         */
 +        originalPartitioner = StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
 +    }
 +
 +    @AfterClass
 +    public static void afterClass()
 +    {
 +        DatabaseDescriptor.setPartitionerUnsafe(originalPartitioner);
 +        DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
 +    }
 +
 +    @Test
 +    public void testSSTablesAssignedToCorrectCompactionStrategy()
 +    {
 +        // Creates 100 SSTables with keys 0-99
 +        int numSSTables = 100;
 +        SchemaLoader.createKeyspace(KS_PREFIX,
 +                                    KeyspaceParams.simple(1),
 +                                    SchemaLoader.standardCFMD(KS_PREFIX, TABLE_PREFIX)
 +                                                .compaction(CompactionParams.scts(Collections.emptyMap())));
 +        ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
 +        cfs.disableAutoCompaction();
 +        for (int i = 0; i < numSSTables; i++)
 +        {
 +            createSSTableWithKey(KS_PREFIX, TABLE_PREFIX, i);
 +        }
 +
 +        // Creates a CompactionStrategymanager with different numbers of disks and check
 +        // if the SSTables are assigned to the correct compaction strategies
 +        for (int numDisks = 2; numDisks < 10; numDisks++)
 +        {
 +            testSSTablesAssignedToCorrectCompactionStrategy(numSSTables, numDisks);
 +        }
 +    }
 +
 +    public void testSSTablesAssignedToCorrectCompactionStrategy(int numSSTables, int numDisks)
 +    {
 +        // Create a mock CFS with the given number of disks
 +        MockCFS cfs = createJBODMockCFS(numDisks);
 +        //Check that CFS will contain numSSTables
 +        assertEquals(numSSTables, cfs.getLiveSSTables().size());
 +
 +        // Creates a compaction strategy manager with an external boundary supplier
 +        final Integer[] boundaries = computeBoundaries(numSSTables, numDisks);
 +
 +        MockBoundaryManager mockBoundaryManager = new MockBoundaryManager(cfs, boundaries);
 +        System.out.println("Boundaries for " + numDisks + " disks is " + Arrays.toString(boundaries));
 +        CompactionStrategyManager csm = new CompactionStrategyManager(cfs, mockBoundaryManager::getBoundaries,
 +                                                                      true);
 +
 +        // Check that SSTables are assigned to the correct Compaction Strategy
 +        for (SSTableReader reader : cfs.getLiveSSTables())
 +        {
 +            verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader);
 +        }
 +
 +        for (int delta = 1; delta <= 3; delta++)
 +        {
 +            // Update disk boundaries
 +            Integer[] previousBoundaries = Arrays.copyOf(boundaries, boundaries.length);
 +            updateBoundaries(mockBoundaryManager, boundaries, delta);
 +
 +            // Check that SSTables are still assigned to the previous boundary layout
 +            System.out.println("Old boundaries: " + Arrays.toString(previousBoundaries) + " New boundaries: " + Arrays.toString(boundaries));
 +            for (SSTableReader reader : cfs.getLiveSSTables())
 +            {
 +                verifySSTableIsAssignedToCorrectStrategy(previousBoundaries, csm, reader);
 +            }
 +
 +            // Reload CompactionStrategyManager so new disk boundaries will be loaded
 +            csm.maybeReloadDiskBoundaries();
 +
 +            for (SSTableReader reader : cfs.getLiveSSTables())
 +            {
 +                // Check that SSTables are assigned to the new boundary layout
 +                verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader);
 +
 +                // Remove SSTable and check that it will be removed from the correct compaction strategy
 +                csm.handleNotification(new SSTableDeletingNotification(reader), this);
 +                assertFalse(((SizeTieredCompactionStrategy)csm.compactionStrategyFor(reader)).sstables.contains(reader));
 +
 +                // Add SSTable again and check that is correctly assigned
 +                csm.handleNotification(new SSTableAddedNotification(Collections.singleton(reader)), this);
 +                verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader);
 +            }
 +        }
 +    }
 +
 +    private MockCFS createJBODMockCFS(int disks)
 +    {
 +        // Create #disks data directories to simulate JBOD
 +        Directories.DataDirectory[] directories = new Directories.DataDirectory[disks];
 +        for (int i = 0; i < disks; ++i)
 +        {
 +            File tempDir = Files.createTempDir();
 +            tempDir.deleteOnExit();
 +            directories[i] = new Directories.DataDirectory(tempDir);
 +        }
 +
 +        ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
 +        MockCFS mockCFS = new MockCFS(cfs, new Directories(cfs.metadata, directories));
 +        mockCFS.disableAutoCompaction();
 +        mockCFS.addSSTables(cfs.getLiveSSTables());
 +        return mockCFS;
 +    }
 +
 +    /**
 +     * Updates the boundaries with a delta
 +     */
 +    private void updateBoundaries(MockBoundaryManager boundaryManager, Integer[] boundaries, int delta)
 +    {
 +        for (int j = 0; j < boundaries.length - 1; j++)
 +        {
 +            if ((j + delta) % 2 == 0)
 +                boundaries[j] -= delta;
 +            else
 +                boundaries[j] += delta;
 +        }
 +        boundaryManager.invalidateBoundaries();
 +    }
 +
 +    private void verifySSTableIsAssignedToCorrectStrategy(Integer[] boundaries, CompactionStrategyManager csm, SSTableReader reader)
 +    {
 +        // Check that sstable is assigned to correct disk
 +        int index = getSSTableIndex(boundaries, reader);
 +        assertEquals(index, csm.compactionStrategyIndexFor(reader));
 +        // Check that compaction strategy actually contains SSTable
 +        assertTrue(((SizeTieredCompactionStrategy)csm.compactionStrategyFor(reader)).sstables.contains(reader));
 +    }
 +
 +    /**
 +     * Creates disk boundaries such that each disk receives
 +     * an equal amount of SSTables
 +     */
 +    private Integer[] computeBoundaries(int numSSTables, int numDisks)
 +    {
 +        Integer[] result = new Integer[numDisks];
 +        int sstablesPerRange = numSSTables / numDisks;
 +        result[0] = sstablesPerRange;
 +        for (int i = 1; i < numDisks; i++)
 +        {
 +            result[i] = result[i - 1] + sstablesPerRange;
 +        }
 +        result[numDisks - 1] = numSSTables; // make last boundary alwyays be the number of SSTables to prevent rounding errors
 +        return result;
 +    }
 +
 +    /**
 +     * Since each SSTable contains keys from 0-99, and each sstable
 +     * generation is numbered from 1-100, since we are using ByteOrderedPartitioner
 +     * we can compute the sstable position in the disk boundaries by finding
 +     * the generation position relative to the boundaries
 +     */
 +    private int getSSTableIndex(Integer[] boundaries, SSTableReader reader)
 +    {
 +        int index = 0;
 +        while (boundaries[index] < reader.descriptor.generation)
 +            index++;
 +        System.out.println("Index for SSTable " + reader.descriptor.generation + " on boundary " + Arrays.toString(boundaries) + " is " + index);
 +        return index;
 +    }
 +
 +
 +
 +    class MockBoundaryManager
 +    {
 +        private final ColumnFamilyStore cfs;
 +        private Integer[] positions;
 +        private DiskBoundaries boundaries;
 +
 +        public MockBoundaryManager(ColumnFamilyStore cfs, Integer[] positions)
 +        {
 +            this.cfs = cfs;
 +            this.positions = positions;
 +            this.boundaries = createDiskBoundaries(cfs, positions);
 +        }
 +
 +        public void invalidateBoundaries()
 +        {
 +            boundaries.invalidate();
 +        }
 +
 +        public DiskBoundaries getBoundaries()
 +        {
 +            if (boundaries.isOutOfDate())
 +                boundaries = createDiskBoundaries(cfs, positions);
 +            return boundaries;
 +        }
 +
 +        private DiskBoundaries createDiskBoundaries(ColumnFamilyStore cfs, Integer[] boundaries)
 +        {
 +            List<PartitionPosition> positions = Arrays.stream(boundaries).map(b -> Util.token(String.format(String.format("%04d", b))).minKeyBound()).collect(Collectors.toList());
-             return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), positions, 0, 0);
++            return new DiskBoundaries(cfs, cfs.getDirectories().getWriteableLocations(), positions, 0, 0);
 +        }
 +    }
 +
 +    private static void createSSTableWithKey(String keyspace, String table, int key)
 +    {
 +        long timestamp = System.currentTimeMillis();
 +        DecoratedKey dk = Util.dk(String.format("%04d", key));
 +        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
 +        new RowUpdateBuilder(cfs.metadata, timestamp, dk.getKey())
 +        .clustering(Integer.toString(key))
 +        .add("val", "val")
 +        .build()
 +        .applyUnsafe();
 +        cfs.forceBlockingFlush();
 +    }
 +
 +    // just to be able to override the data directories
 +    private static class MockCFS extends ColumnFamilyStore
 +    {
 +        MockCFS(ColumnFamilyStore cfs, Directories dirs)
 +        {
 +            super(cfs.keyspace, cfs.getTableName(), 0, cfs.metadata, dirs, false, false, true);
 +        }
 +    }
 +}
diff --cc test/unit/org/apache/cassandra/io/util/FileUtilsTest.java
index 0e7d8c8,8d1b752..2f9ccd4
--- a/test/unit/org/apache/cassandra/io/util/FileUtilsTest.java
+++ b/test/unit/org/apache/cassandra/io/util/FileUtilsTest.java
@@@ -20,19 -20,13 +20,20 @@@ package org.apache.cassandra.io.util
  
  import java.io.File;
  import java.io.IOException;
 +import java.io.RandomAccessFile;
  import java.nio.charset.Charset;
  import java.nio.file.Files;
 +import java.nio.file.Path;
 +import java.nio.file.Paths;
 +import java.util.Arrays;
  
 +import org.junit.BeforeClass;
  import org.junit.Test;
  
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +
  import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
  import static org.junit.Assert.assertTrue;
  
  public class FileUtilsTest
@@@ -66,45 -54,12 +67,55 @@@
      }
  
      @Test
 +    public void testFolderSize() throws Exception
 +    {
 +        File folder = createFolder(Paths.get(DatabaseDescriptor.getAllDataFileLocations()[0], "testFolderSize"));
 +        folder.deleteOnExit();
 +
 +        File childFolder = createFolder(Paths.get(folder.getPath(), "child"));
 +
 +        File[] files = {
 +                       createFile(new File(folder, "001"), 10000),
 +                       createFile(new File(folder, "002"), 1000),
 +                       createFile(new File(folder, "003"), 100),
 +                       createFile(new File(childFolder, "001"), 1000),
 +                       createFile(new File(childFolder, "002"), 2000),
 +        };
 +
 +        assertEquals(0, FileUtils.folderSize(new File(folder, "i_dont_exist")));
 +        assertEquals(files[0].length(), FileUtils.folderSize(files[0]));
 +
 +        long size = FileUtils.folderSize(folder);
 +        assertEquals(Arrays.stream(files).mapToLong(f -> f.length()).sum(), size);
 +    }
 +
++    @Test
+     public void testIsContained()
+     {
+         assertTrue(FileUtils.isContained(new File("/tmp/abc"), new File("/tmp/abc")));
+         assertFalse(FileUtils.isContained(new File("/tmp/abc"), new File("/tmp/abcd")));
+         assertTrue(FileUtils.isContained(new File("/tmp/abc"), new File("/tmp/abc/d")));
+         assertTrue(FileUtils.isContained(new File("/tmp/abc/../abc"), new File("/tmp/abc/d")));
+         assertFalse(FileUtils.isContained(new File("/tmp/abc/../abc"), new File("/tmp/abcc")));
+     }
++
 +    private File createFolder(Path path)
 +    {
 +        File folder = path.toFile();
 +        FileUtils.createDirectory(folder);
 +        return folder;
 +    }
 +
 +    private File createFile(File file, long size)
 +    {
 +        try (RandomAccessFile f = new RandomAccessFile(file, "rw"))
 +        {
 +            f.setLength(size);
 +        }
 +        catch (Exception e)
 +        {
 +            System.err.println(e);
 +        }
 +        return file;
 +    }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org