You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2020/06/12 10:38:23 UTC

[cassandra] branch cassandra-3.0 updated (c092c46 -> 0a1e8d1)

This is an automated email from the ASF dual-hosted git repository.

samt pushed a change to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from c092c46  Merge branch 'cassandra-2.2' into cassandra-3.0
     new c8c3c26  Fix nomenclature of deny and allow lists
     new 0a1e8d1  Merge branch 'cassandra-2.2' into cassandra-3.0

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |  1 +
 .../cassandra/cql3/functions/UDFunction.java       | 22 ++++++++--------
 src/java/org/apache/cassandra/db/Directories.java  | 28 ++++++++++-----------
 ...Directories.java => DisallowedDirectories.java} | 29 +++++++++++-----------
 ...sMBean.java => DisallowedDirectoriesMBean.java} |  3 ++-
 .../db/compaction/AbstractCompactionStrategy.java  |  6 ++---
 .../cassandra/db/compaction/LeveledManifest.java   |  2 +-
 .../cassandra/hints/HintsDispatchExecutor.java     |  2 +-
 .../org/apache/cassandra/hints/HintsStore.java     | 10 ++++----
 .../cassandra/io/sstable/format/SSTableReader.java |  2 +-
 .../cassandra/service/DefaultFSErrorHandler.java   |  6 ++---
 test/unit/org/apache/cassandra/Util.java           |  4 +--
 .../org/apache/cassandra/db/DirectoriesTest.java   |  2 +-
 ....java => CorruptedSSTablesCompactionsTest.java} | 17 +++++++------
 14 files changed, 69 insertions(+), 65 deletions(-)
 rename src/java/org/apache/cassandra/db/{BlacklistedDirectories.java => DisallowedDirectories.java} (77%)
 rename src/java/org/apache/cassandra/db/{BlacklistedDirectoriesMBean.java => DisallowedDirectoriesMBean.java} (95%)
 rename test/unit/org/apache/cassandra/db/compaction/{BlacklistingCompactionsTest.java => CorruptedSSTablesCompactionsTest.java} (93%)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 0a1e8d168bac0f562774814a808e313e1d2d6571
Merge: c092c46 c8c3c26
Author: Sam Tunnicliffe <sa...@beobal.com>
AuthorDate: Fri Jun 12 11:21:14 2020 +0100

    Merge branch 'cassandra-2.2' into cassandra-3.0

 CHANGES.txt                                        |  1 +
 .../cassandra/cql3/functions/UDFunction.java       | 22 ++++++++--------
 src/java/org/apache/cassandra/db/Directories.java  | 28 ++++++++++-----------
 ...Directories.java => DisallowedDirectories.java} | 29 +++++++++++-----------
 ...sMBean.java => DisallowedDirectoriesMBean.java} |  3 ++-
 .../db/compaction/AbstractCompactionStrategy.java  |  6 ++---
 .../cassandra/db/compaction/LeveledManifest.java   |  2 +-
 .../cassandra/hints/HintsDispatchExecutor.java     |  2 +-
 .../org/apache/cassandra/hints/HintsStore.java     | 10 ++++----
 .../cassandra/io/sstable/format/SSTableReader.java |  2 +-
 .../cassandra/service/DefaultFSErrorHandler.java   |  6 ++---
 test/unit/org/apache/cassandra/Util.java           |  4 +--
 .../org/apache/cassandra/db/DirectoriesTest.java   |  2 +-
 ....java => CorruptedSSTablesCompactionsTest.java} | 17 +++++++------
 14 files changed, 69 insertions(+), 65 deletions(-)

diff --cc CHANGES.txt
index 3fdbb96,b10a057..d506dc8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,19 -1,5 +1,20 @@@
 -2.2.18
 +3.0.21
 + * Fix replica-side filtering returning stale data with CL > ONE (CASSANDRA-8272, CASSANDRA-8273)
 + * Fix duplicated row on 2.x upgrades when multi-rows range tombstones interact with collection ones (CASSANDRA-15805)
 + * Rely on snapshotted session infos on StreamResultFuture.maybeComplete to avoid race conditions (CASSANDRA-15667)
 + * EmptyType doesn't override writeValue so could attempt to write bytes when expected not to (CASSANDRA-15790)
 + * Fix index queries on partition key columns when some partitions contains only static data (CASSANDRA-13666)
 + * Avoid creating duplicate rows during major upgrades (CASSANDRA-15789)
 + * liveDiskSpaceUsed and totalDiskSpaceUsed get corrupted if IndexSummaryRedistribution gets interrupted (CASSANDRA-15674)
 + * Fix Debian init start/stop (CASSANDRA-15770)
 + * Fix infinite loop on index query paging in tables with clustering (CASSANDRA-14242)
 + * Fix chunk index overflow due to large sstable with small chunk length (CASSANDRA-15595)
 + * cqlsh return non-zero status when STDIN CQL fails (CASSANDRA-15623)
 + * Don't skip sstables in slice queries based only on local min/max/deletion timestamp (CASSANDRA-15690)
 + * Memtable memory allocations may deadlock (CASSANDRA-15367)
 + * Run evictFromMembership in GossipStage (CASSANDRA-15592)
 +Merged from 2.2:
+  * Fix nomenclature of allow and deny lists (CASSANDRA-15862)
   * Remove generated files from source artifact (CASSANDRA-15849)
   * Remove duplicated tools binaries from tarballs (CASSANDRA-15768)
   * Duplicate results with DISTINCT queries in mixed mode (CASSANDRA-15501)
diff --cc src/java/org/apache/cassandra/cql3/functions/UDFunction.java
index 7b69342,1e5cea6..27f9eb8
--- a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
+++ b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java
@@@ -71,111 -49,10 +71,111 @@@ public abstract class UDFunction extend
      protected final String language;
      protected final String body;
  
 -    protected final DataType[] argDataTypes;
 -    protected final DataType returnDataType;
 +    protected final TypeCodec<Object>[] argCodecs;
 +    protected final TypeCodec<Object> returnCodec;
      protected final boolean calledOnNullInput;
  
 +    //
-     // Access to classes is controlled via a whitelist and a blacklist.
++    // Access to classes is controlled via allow and disallow lists.
 +    //
 +    // When a class is requested (both during compilation and runtime),
-     // the whitelistedPatterns array is searched first, whether the
++    // the allowedPatterns array is searched first, whether the
 +    // requested name matches one of the patterns. If not, nothing is
 +    // returned from the class-loader - meaning ClassNotFoundException
 +    // during runtime and "type could not resolved" during compilation.
 +    //
-     // If a whitelisted pattern has been found, the blacklistedPatterns
++    // If an allowed pattern has been found, the disallowedPatterns
 +    // array is searched for a match. If a match is found, class-loader
 +    // rejects access. Otherwise the class/resource can be loaded.
 +    //
-     private static final String[] whitelistedPatterns =
++    private static final String[] allowedPatterns =
 +    {
 +    "com/datastax/driver/core/",
 +    "com/google/common/reflect/TypeToken",
 +    "java/io/IOException.class",
 +    "java/io/Serializable.class",
 +    "java/lang/",
 +    "java/math/",
 +    "java/net/InetAddress.class",
 +    "java/net/Inet4Address.class",
 +    "java/net/Inet6Address.class",
 +    "java/net/UnknownHostException.class", // req'd by InetAddress
 +    "java/net/NetworkInterface.class", // req'd by InetAddress
 +    "java/net/SocketException.class", // req'd by InetAddress
 +    "java/nio/Buffer.class",
 +    "java/nio/ByteBuffer.class",
 +    "java/text/",
 +    "java/time/",
 +    "java/util/",
 +    "org/apache/cassandra/cql3/functions/JavaUDF.class",
 +    "org/apache/cassandra/exceptions/",
 +    };
-     // Only need to blacklist a pattern, if it would otherwise be allowed via whitelistedPatterns
-     private static final String[] blacklistedPatterns =
++    // Only need to disallow a pattern, if it would otherwise be allowed via allowedPatterns
++    private static final String[] disallowedPatterns =
 +    {
 +    "com/datastax/driver/core/Cluster.class",
 +    "com/datastax/driver/core/Metrics.class",
 +    "com/datastax/driver/core/NettyOptions.class",
 +    "com/datastax/driver/core/Session.class",
 +    "com/datastax/driver/core/Statement.class",
 +    "com/datastax/driver/core/TimestampGenerator.class", // indirectly covers ServerSideTimestampGenerator + ThreadLocalMonotonicTimestampGenerator
 +    "java/lang/Compiler.class",
 +    "java/lang/InheritableThreadLocal.class",
 +    "java/lang/Package.class",
 +    "java/lang/Process.class",
 +    "java/lang/ProcessBuilder.class",
 +    "java/lang/ProcessEnvironment.class",
 +    "java/lang/ProcessImpl.class",
 +    "java/lang/Runnable.class",
 +    "java/lang/Runtime.class",
 +    "java/lang/Shutdown.class",
 +    "java/lang/Thread.class",
 +    "java/lang/ThreadGroup.class",
 +    "java/lang/ThreadLocal.class",
 +    "java/lang/instrument/",
 +    "java/lang/invoke/",
 +    "java/lang/management/",
 +    "java/lang/ref/",
 +    "java/lang/reflect/",
 +    "java/util/ServiceLoader.class",
 +    "java/util/Timer.class",
 +    "java/util/concurrent/",
 +    "java/util/function/",
 +    "java/util/jar/",
 +    "java/util/logging/",
 +    "java/util/prefs/",
 +    "java/util/spi/",
 +    "java/util/stream/",
 +    "java/util/zip/",
 +    };
 +
 +    static boolean secureResource(String resource)
 +    {
 +        while (resource.startsWith("/"))
 +            resource = resource.substring(1);
 +
-         for (String white : whitelistedPatterns)
-             if (resource.startsWith(white))
++        for (String allowed : allowedPatterns)
++            if (resource.startsWith(allowed))
 +            {
 +
-                 // resource is in whitelistedPatterns, let's see if it is not explicityl blacklisted
-                 for (String black : blacklistedPatterns)
-                     if (resource.startsWith(black))
++                // resource is in allowedPatterns, let's see if it is not explicitly disallowed
++                for (String disallowed : disallowedPatterns)
++                    if (resource.startsWith(disallowed))
 +                    {
 +                        logger.trace("access denied: resource {}", resource);
 +                        return false;
 +                    }
 +
 +                return true;
 +            }
 +
 +        logger.trace("access denied: resource {}", resource);
 +        return false;
 +    }
 +
 +    // setup the UDF class loader with no parent class loader so that we have full control about what class/resource UDF uses
 +    static final ClassLoader udfClassLoader = new UDFClassLoader();
 +
      protected UDFunction(FunctionName name,
                           List<ColumnIdentifier> argNames,
                           List<AbstractType<?>> argTypes,
diff --cc src/java/org/apache/cassandra/db/Directories.java
index b104509,d1aa650..af3f63c
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@@ -331,37 -313,9 +331,37 @@@ public class Directorie
      }
  
      /**
-      * Returns a temporary subdirectory on non-blacklisted data directory
++     * Returns a temporary subdirectory on allowed data directory
 +     * that _currently_ has {@code writeSize} bytes as usable space.
 +     * This method does not create the temporary directory.
 +     *
-      * @throws IOError if all directories are blacklisted.
++     * @throws IOError if all directories are disallowed.
 +     */
 +    public File getTemporaryWriteableDirectoryAsFile(long writeSize)
 +    {
 +        File location = getLocationForDisk(getWriteableLocation(writeSize));
 +        if (location == null)
 +            return null;
 +        return new File(location, TMP_SUBDIR);
 +    }
 +
 +    public void removeTemporaryDirectories()
 +    {
 +        for (File dataDir : dataPaths)
 +        {
 +            File tmpDir = new File(dataDir, TMP_SUBDIR);
 +            if (tmpDir.exists())
 +            {
 +                logger.debug("Removing temporary directory {}", tmpDir);
 +                FileUtils.deleteRecursive(tmpDir);
 +            }
 +        }
 +    }
 +
 +    /**
-      * Returns a non-blacklisted data directory that _currently_ has {@code writeSize} bytes as usable space.
+      * Returns an allowed data directory that _currently_ has {@code writeSize} bytes as usable space.
       *
-      * @throws IOError if all directories are blacklisted.
+      * @throws IOError if all directories are disallowed.
       */
      public DataDirectory getWriteableLocation(long writeSize)
      {
@@@ -369,13 -323,13 +369,13 @@@
  
          long totalAvailable = 0L;
  
-         // pick directories with enough space and so that resulting sstable dirs aren't blacklisted for writes.
+         // pick directories with enough space and so that resulting sstable dirs aren't disallowed for writes.
          boolean tooBig = false;
 -        for (DataDirectory dataDir : dataDirectories)
 +        for (DataDirectory dataDir : paths)
          {
-             if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
+             if (DisallowedDirectories.isUnwritable(getLocationForDisk(dataDir)))
              {
-                 logger.trace("removing blacklisted candidate {}", dataDir.location);
+                 logger.trace("removing disallowed candidate {}", dataDir.location);
                  continue;
              }
              DataDirectoryCandidate candidate = new DataDirectoryCandidate(dataDir);
@@@ -392,9 -346,9 +392,9 @@@
  
          if (candidates.isEmpty())
              if (tooBig)
 -                throw new RuntimeException("Insufficient disk space to write " + writeSize + " bytes");
 +                throw new FSDiskFullWriteError(new IOException("Insufficient disk space to write " + writeSize + " bytes"), "");
              else
-                 throw new FSWriteError(new IOException("All configured data directories have been blacklisted as unwritable for erroring out"), "");
+                 throw new FSWriteError(new IOException("All configured data directories have been disallowed as unwritable for erroring out"), "");
  
          // shortcut for single data directory systems
          if (candidates.size() == 1)
@@@ -437,9 -391,9 +437,9 @@@
          long writeSize = expectedTotalWriteSize / estimatedSSTables;
          long totalAvailable = 0L;
  
 -        for (DataDirectory dataDir : dataDirectories)
 +        for (DataDirectory dataDir : paths)
          {
-             if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
+             if (DisallowedDirectories.isUnwritable(getLocationForDisk(dataDir)))
                    continue;
              DataDirectoryCandidate candidate = new DataDirectoryCandidate(dataDir);
              // exclude directory if its total writeSize does not fit to data directory
diff --cc src/java/org/apache/cassandra/db/DisallowedDirectories.java
index afa726b,c0518e2..75b5e79
--- a/src/java/org/apache/cassandra/db/DisallowedDirectories.java
+++ b/src/java/org/apache/cassandra/db/DisallowedDirectories.java
@@@ -25,16 -25,14 +25,16 @@@ import java.util.Collections
  import java.util.Set;
  import java.util.concurrent.CopyOnWriteArraySet;
  
 +import com.google.common.annotations.VisibleForTesting;
 +
- import org.apache.cassandra.utils.JVMStabilityInspector;
  import org.apache.cassandra.utils.MBeanWrapper;
  
- public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
+ public class DisallowedDirectories implements DisallowedDirectoriesMBean
  {
-     public static final String MBEAN_NAME = "org.apache.cassandra.db:type=BlacklistedDirectories";
-     private static final Logger logger = LoggerFactory.getLogger(BlacklistedDirectories.class);
-     private static final BlacklistedDirectories instance = new BlacklistedDirectories();
+     public static final String DEPRECATED_MBEAN_NAME = "org.apache.cassandra.db:type=BlacklistedDirectories";
+     public static final String MBEAN_NAME = "org.apache.cassandra.db:type=DisallowedDirectories";
+     private static final Logger logger = LoggerFactory.getLogger(DisallowedDirectories.class);
+     private static final DisallowedDirectories instance = new DisallowedDirectories();
  
      private final Set<File> unreadableDirectories = new CopyOnWriteArraySet<File>();
      private final Set<File> unwritableDirectories = new CopyOnWriteArraySet<File>();
@@@ -90,19 -89,8 +91,19 @@@
      }
  
      /**
 +     * Testing only!
 +     * Clear the set of unwritable directories.
 +     */
 +    @VisibleForTesting
 +    public static void clearUnwritableUnsafe()
 +    {
 +        instance.unwritableDirectories.clear();
 +    }
 +
 +
 +    /**
-      * Tells whether or not the directory is blacklisted for reads.
-      * @return whether or not the directory is blacklisted for reads.
+      * Tells whether or not the directory is disallowed for reads.
+      * @return whether or not the directory is disallowed for reads.
       */
      public static boolean isUnreadable(File directory)
      {
diff --cc src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 2348d19,f9ed780..7219504
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@@ -247,27 -243,22 +247,27 @@@ public abstract class AbstractCompactio
      }
  
      /**
-      * Filters SSTables that are to be blacklisted from the given collection
+      * Filters SSTables that are to be excluded from the given collection
       *
-      * @param originalCandidates The collection to check for blacklisted SSTables
-      * @return list of the SSTables with blacklisted ones filtered out
+      * @param originalCandidates The collection to check for excluded SSTables
+      * @return list of the SSTables with excluded ones filtered out
       */
 -    public static Iterable<SSTableReader> filterSuspectSSTables(Iterable<SSTableReader> originalCandidates)
 +    public static List<SSTableReader> filterSuspectSSTables(Iterable<SSTableReader> originalCandidates)
      {
 -        return Iterables.filter(originalCandidates, new Predicate<SSTableReader>()
 +        List<SSTableReader> filtered = new ArrayList<>();
 +        for (SSTableReader sstable : originalCandidates)
          {
 -            public boolean apply(SSTableReader sstable)
 -            {
 -                return !sstable.isMarkedSuspect();
 -            }
 -        });
 +            if (!sstable.isMarkedSuspect())
 +                filtered.add(sstable);
 +        }
 +        return filtered;
      }
  
 +
 +    public ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
 +    {
 +        return range == null ? getScanners(sstables, (Collection<Range<Token>>)null) : getScanners(sstables, Collections.singleton(range));
 +    }
      /**
       * Returns a list of KeyScanners given sstables and a range on which to scan.
       * The default implementation simply grab one SSTableScanner per-sstable, but overriding this method
diff --cc src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
index d6c28d4,0000000..c562dd0
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
@@@ -1,309 -1,0 +1,309 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.hints;
 +
 +import java.io.File;
 +import java.net.InetAddress;
 +import java.util.Map;
 +import java.util.UUID;
 +import java.util.concurrent.*;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.function.BooleanSupplier;
 +import java.util.function.Function;
 +import java.util.function.Supplier;
 +
 +import com.google.common.util.concurrent.RateLimiter;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 +import org.apache.cassandra.concurrent.NamedThreadFactory;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.io.FSReadError;
 +import org.apache.cassandra.service.StorageService;
 +
 +/**
 + * A multi-threaded (by default) executor for dispatching hints.
 + *
 + * Most of dispatch is triggered by {@link HintsDispatchTrigger} running every ~10 seconds.
 + */
 +final class HintsDispatchExecutor
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(HintsDispatchExecutor.class);
 +
 +    private final File hintsDirectory;
 +    private final ExecutorService executor;
 +    private final AtomicBoolean isPaused;
 +    private final Function<InetAddress, Boolean> isAlive;
 +    private final Map<UUID, Future> scheduledDispatches;
 +
 +    HintsDispatchExecutor(File hintsDirectory, int maxThreads, AtomicBoolean isPaused, Function<InetAddress, Boolean> isAlive)
 +    {
 +        this.hintsDirectory = hintsDirectory;
 +        this.isPaused = isPaused;
 +        this.isAlive = isAlive;
 +
 +        scheduledDispatches = new ConcurrentHashMap<>();
 +        executor = new JMXEnabledThreadPoolExecutor(maxThreads, 1, TimeUnit.MINUTES,
 +                                                    new LinkedBlockingQueue<>(),
 +                                                    new NamedThreadFactory("HintsDispatcher", Thread.MIN_PRIORITY),
 +                                                    "internal");
 +    }
 +
 +    /*
 +     * It's safe to terminate dispatch in process and to deschedule dispatch.
 +     */
 +    void shutdownBlocking()
 +    {
 +        scheduledDispatches.clear();
 +        executor.shutdownNow();
 +        try
 +        {
 +            executor.awaitTermination(1, TimeUnit.MINUTES);
 +        }
 +        catch (InterruptedException e)
 +        {
 +            throw new AssertionError(e);
 +        }
 +    }
 +
 +    boolean isScheduled(HintsStore store)
 +    {
 +        return scheduledDispatches.containsKey(store.hostId);
 +    }
 +
 +    Future dispatch(HintsStore store)
 +    {
 +        return dispatch(store, store.hostId);
 +    }
 +
 +    Future dispatch(HintsStore store, UUID hostId)
 +    {
 +        /*
 +         * It is safe to perform dispatch for the same host id concurrently in two or more threads,
 +         * however there is nothing to win from it - so we don't.
 +         *
 +         * Additionally, having just one dispatch task per host id ensures that we'll never violate our per-destination
 +         * rate limit, without having to share a ratelimiter between threads.
 +         *
 +         * It also simplifies reasoning about dispatch sessions.
 +         */
 +        return scheduledDispatches.computeIfAbsent(hostId, uuid -> executor.submit(new DispatchHintsTask(store, hostId)));
 +    }
 +
 +    Future transfer(HintsCatalog catalog, Supplier<UUID> hostIdSupplier)
 +    {
 +        return executor.submit(new TransferHintsTask(catalog, hostIdSupplier));
 +    }
 +
 +    void completeDispatchBlockingly(HintsStore store)
 +    {
 +        Future future = scheduledDispatches.get(store.hostId);
 +        try
 +        {
 +            if (future != null)
 +                future.get();
 +        }
 +        catch (ExecutionException | InterruptedException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    void interruptDispatch(UUID hostId)
 +    {
 +        Future future = scheduledDispatches.remove(hostId);
 +
 +        if (null != future)
 +            future.cancel(true);
 +    }
 +
 +    private final class TransferHintsTask implements Runnable
 +    {
 +        private final HintsCatalog catalog;
 +
 +        /*
 +         * Supplies target hosts to stream to. Generally returns the one the DynamicSnitch thinks is closest.
 +         * We use a supplier here to be able to get a new host if the current one dies during streaming.
 +         */
 +        private final Supplier<UUID> hostIdSupplier;
 +
 +        private TransferHintsTask(HintsCatalog catalog, Supplier<UUID> hostIdSupplier)
 +        {
 +            this.catalog = catalog;
 +            this.hostIdSupplier = hostIdSupplier;
 +        }
 +
 +        @Override
 +        public void run()
 +        {
 +            UUID hostId = hostIdSupplier.get();
 +            InetAddress address = StorageService.instance.getEndpointForHostId(hostId);
 +            logger.info("Transferring all hints to {}: {}", address, hostId);
 +            if (transfer(hostId))
 +                return;
 +
 +            logger.warn("Failed to transfer all hints to {}: {}; will retry in {} seconds", address, hostId, 10);
 +
 +            try
 +            {
 +                TimeUnit.SECONDS.sleep(10);
 +            }
 +            catch (InterruptedException e)
 +            {
 +                throw new RuntimeException(e);
 +            }
 +
 +            hostId = hostIdSupplier.get();
 +            logger.info("Transferring all hints to {}: {}", address, hostId);
 +            if (!transfer(hostId))
 +            {
 +                logger.error("Failed to transfer all hints to {}: {}", address, hostId);
 +                throw new RuntimeException("Failed to transfer all hints to " + hostId);
 +            }
 +        }
 +
 +        private boolean transfer(UUID hostId)
 +        {
 +            catalog.stores()
 +                   .map(store -> new DispatchHintsTask(store, hostId))
 +                   .forEach(Runnable::run);
 +
 +            return !catalog.hasFiles();
 +        }
 +    }
 +
 +    private final class DispatchHintsTask implements Runnable
 +    {
 +        private final HintsStore store;
 +        private final UUID hostId;
 +        private final RateLimiter rateLimiter;
 +
 +        DispatchHintsTask(HintsStore store, UUID hostId)
 +        {
 +            this.store = store;
 +            this.hostId = hostId;
 +
 +            // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
 +            // max rate is scaled by the number of nodes in the cluster (CASSANDRA-5272).
 +            // the goal is to bound maximum hints traffic going towards a particular node from the rest of the cluster,
 +            // not total outgoing hints traffic from this node - this is why the rate limiter is not shared between
 +            // all the dispatch tasks (as there will be at most one dispatch task for a particular host id at a time).
 +            int nodesCount = Math.max(1, StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1);
 +            int throttleInKB = DatabaseDescriptor.getHintedHandoffThrottleInKB() / nodesCount;
 +            this.rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
 +        }
 +
 +        public void run()
 +        {
 +            try
 +            {
 +                dispatch();
 +            }
 +            finally
 +            {
 +                scheduledDispatches.remove(hostId);
 +            }
 +        }
 +
 +        private void dispatch()
 +        {
 +            while (true)
 +            {
 +                if (isPaused.get())
 +                    break;
 +
 +                HintsDescriptor descriptor = store.poll();
 +                if (descriptor == null)
 +                    break;
 +
 +                try
 +                {
 +                    if (!dispatch(descriptor))
 +                        break;
 +                }
 +                catch (FSReadError e)
 +                {
 +                    logger.error("Failed to dispatch hints file {}: file is corrupted ({})", descriptor.fileName(), e);
 +                    store.cleanUp(descriptor);
-                     store.blacklist(descriptor);
++                    store.markCorrupted(descriptor);
 +                    throw e;
 +                }
 +            }
 +        }
 +
 +        /*
 +         * Will return true if dispatch was successful, false if we hit a failure (destination node went down, for example).
 +         */
 +        private boolean dispatch(HintsDescriptor descriptor)
 +        {
 +            logger.trace("Dispatching hints file {}", descriptor.fileName());
 +
 +            InetAddress address = StorageService.instance.getEndpointForHostId(hostId);
 +            if (address != null)
 +                return deliver(descriptor, address);
 +
 +            // address == null means the target no longer exist; find new home for each hint entry.
 +            convert(descriptor);
 +            return true;
 +        }
 +
 +        private boolean deliver(HintsDescriptor descriptor, InetAddress address)
 +        {
 +            File file = new File(hintsDirectory, descriptor.fileName());
 +            InputPosition offset = store.getDispatchOffset(descriptor);
 +
 +            BooleanSupplier shouldAbort = () -> !isAlive.apply(address) || isPaused.get();
 +            try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, address, descriptor.hostId, shouldAbort))
 +            {
 +                if (offset != null)
 +                    dispatcher.seek(offset);
 +
 +                if (dispatcher.dispatch())
 +                {
 +                    store.delete(descriptor);
 +                    store.cleanUp(descriptor);
 +                    logger.info("Finished hinted handoff of file {} to endpoint {}: {}", descriptor.fileName(), address, hostId);
 +                    return true;
 +                }
 +                else
 +                {
 +                    store.markDispatchOffset(descriptor, dispatcher.dispatchPosition());
 +                    store.offerFirst(descriptor);
 +                    logger.info("Finished hinted handoff of file {} to endpoint {}: {}, partially", descriptor.fileName(), address, hostId);
 +                    return false;
 +                }
 +            }
 +        }
 +
 +        // for each hint in the hints file for a node that isn't part of the ring anymore, write RF hints for each replica
 +        private void convert(HintsDescriptor descriptor)
 +        {
 +            File file = new File(hintsDirectory, descriptor.fileName());
 +
 +            try (HintsReader reader = HintsReader.open(file, rateLimiter))
 +            {
 +                reader.forEach(page -> page.hintsIterator().forEachRemaining(HintsService.instance::writeForAllReplicas));
 +                store.delete(descriptor);
 +                store.cleanUp(descriptor);
 +                logger.info("Finished converting hints file {}", descriptor.fileName());
 +            }
 +        }
 +    }
 +}
diff --cc src/java/org/apache/cassandra/hints/HintsStore.java
index 032de5a,0000000..b08fc72
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/hints/HintsStore.java
+++ b/src/java/org/apache/cassandra/hints/HintsStore.java
@@@ -1,220 -1,0 +1,220 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.hints;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.net.InetAddress;
 +import java.util.*;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ConcurrentLinkedDeque;
 +import java.util.concurrent.ConcurrentLinkedQueue;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.ImmutableMap;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.gms.FailureDetector;
 +import org.apache.cassandra.io.FSWriteError;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.SyncUtil;
 +
 +/**
 + * Encapsulates the state of a peer's hints: the queue of hints files for dispatch, and the current writer (if any).
 + *
 + * The queue for dispatch is multi-threading safe.
 + *
 + * The writer MUST only be accessed by {@link HintsWriteExecutor}.
 + */
 +final class HintsStore
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(HintsStore.class);
 +
 +    public final UUID hostId;
 +    private final File hintsDirectory;
 +    private final ImmutableMap<String, Object> writerParams;
 +
 +    private final Map<HintsDescriptor, InputPosition> dispatchPositions;
 +    private final Deque<HintsDescriptor> dispatchDequeue;
-     private final Queue<HintsDescriptor> blacklistedFiles;
++    private final Queue<HintsDescriptor> corruptedFiles;
 +
 +    // last timestamp used in a descriptor; make sure to not reuse the same timestamp for new descriptors.
 +    private volatile long lastUsedTimestamp;
 +    private volatile HintsWriter hintsWriter;
 +
 +    private HintsStore(UUID hostId, File hintsDirectory, ImmutableMap<String, Object> writerParams, List<HintsDescriptor> descriptors)
 +    {
 +        this.hostId = hostId;
 +        this.hintsDirectory = hintsDirectory;
 +        this.writerParams = writerParams;
 +
 +        dispatchPositions = new ConcurrentHashMap<>();
 +        dispatchDequeue = new ConcurrentLinkedDeque<>(descriptors);
-         blacklistedFiles = new ConcurrentLinkedQueue<>();
++        corruptedFiles = new ConcurrentLinkedQueue<>();
 +
 +        //noinspection resource
 +        lastUsedTimestamp = descriptors.stream().mapToLong(d -> d.timestamp).max().orElse(0L);
 +    }
 +
 +    static HintsStore create(UUID hostId, File hintsDirectory, ImmutableMap<String, Object> writerParams, List<HintsDescriptor> descriptors)
 +    {
 +        descriptors.sort((d1, d2) -> Long.compare(d1.timestamp, d2.timestamp));
 +        return new HintsStore(hostId, hintsDirectory, writerParams, descriptors);
 +    }
 +
 +    @VisibleForTesting
 +    int getDispatchQueueSize()
 +    {
 +        return dispatchDequeue.size();
 +    }
 +
 +    InetAddress address()
 +    {
 +        return StorageService.instance.getEndpointForHostId(hostId);
 +    }
 +
 +    boolean isLive()
 +    {
 +        InetAddress address = address();
 +        return address != null && FailureDetector.instance.isAlive(address);
 +    }
 +
 +    HintsDescriptor poll()
 +    {
 +        return dispatchDequeue.poll();
 +    }
 +
 +    void offerFirst(HintsDescriptor descriptor)
 +    {
 +        dispatchDequeue.offerFirst(descriptor);
 +    }
 +
 +    void offerLast(HintsDescriptor descriptor)
 +    {
 +        dispatchDequeue.offerLast(descriptor);
 +    }
 +
 +    void deleteAllHints()
 +    {
 +        HintsDescriptor descriptor;
 +        while ((descriptor = poll()) != null)
 +        {
 +            cleanUp(descriptor);
 +            delete(descriptor);
 +        }
 +
-         while ((descriptor = blacklistedFiles.poll()) != null)
++        while ((descriptor = corruptedFiles.poll()) != null)
 +        {
 +            cleanUp(descriptor);
 +            delete(descriptor);
 +        }
 +    }
 +
 +    void delete(HintsDescriptor descriptor)
 +    {
 +        File hintsFile = new File(hintsDirectory, descriptor.fileName());
 +        if (hintsFile.delete())
 +            logger.info("Deleted hint file {}", descriptor.fileName());
 +        else
 +            logger.error("Failed to delete hint file {}", descriptor.fileName());
 +
 +        //noinspection ResultOfMethodCallIgnored
 +        new File(hintsDirectory, descriptor.checksumFileName()).delete();
 +    }
 +
 +    boolean hasFiles()
 +    {
 +        return !dispatchDequeue.isEmpty();
 +    }
 +
 +    InputPosition getDispatchOffset(HintsDescriptor descriptor)
 +    {
 +        return dispatchPositions.get(descriptor);
 +    }
 +
 +    void markDispatchOffset(HintsDescriptor descriptor, InputPosition inputPosition)
 +    {
 +        dispatchPositions.put(descriptor, inputPosition);
 +    }
 +
 +    void cleanUp(HintsDescriptor descriptor)
 +    {
 +        dispatchPositions.remove(descriptor);
 +    }
 +
-     void blacklist(HintsDescriptor descriptor)
++    void markCorrupted(HintsDescriptor descriptor)
 +    {
-         blacklistedFiles.add(descriptor);
++        corruptedFiles.add(descriptor);
 +    }
 +
 +    /*
 +     * Methods dealing with HintsWriter.
 +     *
 +     * All of these, with the exception of isWriting(), are for exclusively single-threaded use by HintsWriteExecutor.
 +     */
 +
 +    boolean isWriting()
 +    {
 +        return hintsWriter != null;
 +    }
 +
 +    HintsWriter getOrOpenWriter()
 +    {
 +        if (hintsWriter == null)
 +            hintsWriter = openWriter();
 +        return hintsWriter;
 +    }
 +
 +    HintsWriter getWriter()
 +    {
 +        return hintsWriter;
 +    }
 +
 +    private HintsWriter openWriter()
 +    {
 +        lastUsedTimestamp = Math.max(System.currentTimeMillis(), lastUsedTimestamp + 1);
 +        HintsDescriptor descriptor = new HintsDescriptor(hostId, lastUsedTimestamp, writerParams);
 +
 +        try
 +        {
 +            return HintsWriter.create(hintsDirectory, descriptor);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, descriptor.fileName());
 +        }
 +    }
 +
 +    void closeWriter()
 +    {
 +        if (hintsWriter != null)
 +        {
 +            hintsWriter.close();
 +            offerLast(hintsWriter.descriptor());
 +            hintsWriter = null;
 +            SyncUtil.trySyncDir(hintsDirectory);
 +        }
 +    }
 +
 +    void fsyncWriter()
 +    {
 +        if (hintsWriter != null)
 +            hintsWriter.fsync();
 +    }
 +}
diff --cc test/unit/org/apache/cassandra/Util.java
index d758efe,f6b4771..a49440d
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@@ -539,113 -383,4 +539,113 @@@ public class Uti
      {
          thread.join(10000);
      }
 +
 +    public static AssertionError runCatchingAssertionError(Runnable test)
 +    {
 +        try
 +        {
 +            test.run();
 +            return null;
 +        }
 +        catch (AssertionError e)
 +        {
 +            return e;
 +        }
 +    }
 +
 +    /**
 +     * Wrapper function used to run a test that can sometimes flake for uncontrollable reasons.
 +     *
 +     * If the given test fails on the first run, it is executed the given number of times again, expecting all secondary
 +     * runs to succeed. If they do, the failure is understood as a flake and the test is treated as passing.
 +     *
 +     * Do not use this if the test is deterministic and its success is not influenced by external factors (such as time,
 +     * selection of random seed, network failures, etc.). If the test can be made independent of such factors, it is
 +     * probably preferable to do so rather than use this method.
 +     *
 +     * @param test The test to run.
 +     * @param rerunsOnFailure How many times to re-run it if it fails. All reruns must pass.
 +     * @param message Message to send to System.err on initial failure.
 +     */
 +    public static void flakyTest(Runnable test, int rerunsOnFailure, String message)
 +    {
 +        AssertionError e = runCatchingAssertionError(test);
 +        if (e == null)
 +            return;     // success
 +        System.err.format("Test failed. %s%n"
 +                        + "Re-running %d times to verify it isn't failing more often than it should.%n"
 +                        + "Failure was: %s%n", message, rerunsOnFailure, e);
 +        e.printStackTrace();
 +
 +        int rerunsFailed = 0;
 +        for (int i = 0; i < rerunsOnFailure; ++i)
 +        {
 +            AssertionError t = runCatchingAssertionError(test);
 +            if (t != null)
 +            {
 +                ++rerunsFailed;
 +                e.addSuppressed(t);
 +            }
 +        }
 +        if (rerunsFailed > 0)
 +        {
 +            System.err.format("Test failed in %d of the %d reruns.%n", rerunsFailed, rerunsOnFailure);
 +            throw e;
 +        }
 +
 +        System.err.println("All reruns succeeded. Failure treated as flake.");
 +    }
 +
 +    // for use with Optional in tests, can be used as an argument to orElseThrow
 +    public static Supplier<AssertionError> throwAssert(final String message)
 +    {
 +        return () -> new AssertionError(message);
 +    }
 +
 +    public static class UnfilteredSource extends AbstractUnfilteredRowIterator implements UnfilteredRowIterator
 +    {
 +        Iterator<Unfiltered> content;
 +
 +        public UnfilteredSource(CFMetaData cfm, DecoratedKey partitionKey, Row staticRow, Iterator<Unfiltered> content)
 +        {
 +            super(cfm,
 +                  partitionKey,
 +                  DeletionTime.LIVE,
 +                  cfm.partitionColumns(),
 +                  staticRow != null ? staticRow : Rows.EMPTY_STATIC_ROW,
 +                  false,
 +                  EncodingStats.NO_STATS);
 +            this.content = content;
 +        }
 +
 +        @Override
 +        protected Unfiltered computeNext()
 +        {
 +            return content.hasNext() ? content.next() : endOfData();
 +        }
 +    }
 +
 +    public static UnfilteredPartitionIterator executeLocally(PartitionRangeReadCommand command,
 +                                                             ColumnFamilyStore cfs,
 +                                                             ReadOrderGroup orderGroup)
 +    {
 +        return command.queryStorage(cfs, orderGroup);
 +    }
 +
 +    public static Closeable markDirectoriesUnwriteable(ColumnFamilyStore cfs)
 +    {
 +        try
 +        {
 +            for ( ; ; )
 +            {
 +                DataDirectory dir = cfs.getDirectories().getWriteableLocation(1);
-                 BlacklistedDirectories.maybeMarkUnwritable(cfs.getDirectories().getLocationForDisk(dir));
++                DisallowedDirectories.maybeMarkUnwritable(cfs.getDirectories().getLocationForDisk(dir));
 +            }
 +        }
 +        catch (IOError e)
 +        {
 +            // Expected -- marked all directories as unwritable
 +        }
-         return () -> BlacklistedDirectories.clearUnwritableUnsafe();
++        return () -> DisallowedDirectories.clearUnwritableUnsafe();
 +    }
  }
diff --cc test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java
index 6378e09,5208401..9276737
--- a/test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CorruptedSSTablesCompactionsTest.java
@@@ -29,34 -29,26 +29,34 @@@ import org.junit.AfterClass
  import org.junit.BeforeClass;
  import org.junit.Test;
  
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertNotNull;
 +
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
 -import org.apache.cassandra.config.KSMetaData;
 +import org.apache.cassandra.config.*;
  import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.marshal.LongType;
  import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.io.util.FileUtils;
 -import org.apache.cassandra.locator.SimpleStrategy;
 -import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.schema.*;
  
 -import static org.junit.Assert.assertEquals;
 -import static org.junit.Assert.assertNotNull;
  import static org.junit.Assert.assertTrue;
 -import static org.apache.cassandra.Util.cellname;
  
- public class BlacklistingCompactionsTest
+ public class CorruptedSSTablesCompactionsTest
  {
-     private static final Logger logger = LoggerFactory.getLogger(BlacklistingCompactionsTest.class);
++    private static final Logger logger = LoggerFactory.getLogger(CorruptedSSTablesCompactionsTest.class);
 +
 +    private static Random random;
 +
-     private static final String KEYSPACE1 = "BlacklistingCompactionsTest";
+     private static final String KEYSPACE1 = "CorruptedSSTablesCompactionsTest";
 -    private static final String CF_STANDARD1 = "Standard1";
 -    // seed hardcoded to one we know works:
 -    private static final Random random = new Random(1);
 +    private static final String STANDARD_STCS = "Standard_STCS";
 +    private static final String STANDARD_LCS = "Standard_LCS";
 +    private static int maxValueSize;
  
      @After
      public void leakDetect() throws InterruptedException
@@@ -112,18 -81,18 +112,19 @@@
      }
  
      @Test
-     public void testBlacklistingWithSizeTieredCompactionStrategy() throws Exception
+     public void testCorruptedSSTablesWithSizeTieredCompactionStrategy() throws Exception
      {
-         testBlacklisting(STANDARD_STCS);
 -        testCorruptedSSTables(SizeTieredCompactionStrategy.class.getCanonicalName());
++        testCorruptedSSTables(STANDARD_STCS);
      }
  
      @Test
-     public void testBlacklistingWithLeveledCompactionStrategy() throws Exception
+     public void testCorruptedSSTablesWithLeveledCompactionStrategy() throws Exception
      {
-         testBlacklisting(STANDARD_LCS);
 -        testCorruptedSSTables(LeveledCompactionStrategy.class.getCanonicalName());
++        testCorruptedSSTables(STANDARD_LCS);
      }
  
-     private void testBlacklisting(String tableName) throws Exception
 -    public void testCorruptedSSTables(String compactionStrategy) throws Exception
++
++    public void testCorruptedSSTables(String tableName) throws Exception
      {
          // this test does enough rows to force multiple block indexes to be used
          Keyspace keyspace = Keyspace.open(KEYSPACE1);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org