You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2022/06/02 17:23:56 UTC

[cassandra] branch cassandra-4.1 updated (133ad50a84 -> 663e51d928)

This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a change to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git


    from 133ad50a84 Merge branch 'cassandra-4.0' into cassandra-4.1
     new a00d8fd5ba Ensure FileStreamTask cannot compromise shared channel proxy for system table when interrupted
     new 663e51d928 Merge branch 'cassandra-4.0' into cassandra-4.1

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/repair/LocalSyncTask.java |  46 ++++-----
 .../apache/cassandra/streaming/StreamSession.java  |   6 ++
 .../cassandra/streaming/StreamingChannel.java      |   9 ++
 .../async/NettyStreamingConnectionFactory.java     |   9 ++
 .../async/StreamingMultiplexedChannel.java         |  60 ++++++-----
 .../cassandra/tools/BulkLoadConnectionFactory.java |  21 +++-
 .../distributed/test/RepairErrorsTest.java         | 114 +++++++++++++++++++--
 8 files changed, 208 insertions(+), 58 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-4.0' into cassandra-4.1

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 663e51d92868cbd045a83d7fa53e373bb28721a0
Merge: 133ad50a84 a00d8fd5ba
Author: Caleb Rackliffe <ca...@gmail.com>
AuthorDate: Thu Jun 2 12:07:38 2022 -0500

    Merge branch 'cassandra-4.0' into cassandra-4.1

 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/repair/LocalSyncTask.java |  46 ++++-----
 .../apache/cassandra/streaming/StreamSession.java  |   6 ++
 .../cassandra/streaming/StreamingChannel.java      |   9 ++
 .../async/NettyStreamingConnectionFactory.java     |   9 ++
 .../async/StreamingMultiplexedChannel.java         |  60 ++++++-----
 .../cassandra/tools/BulkLoadConnectionFactory.java |  21 +++-
 .../distributed/test/RepairErrorsTest.java         | 114 +++++++++++++++++++--
 8 files changed, 208 insertions(+), 58 deletions(-)

diff --cc CHANGES.txt
index 07ccdf23a8,31c62caa57..cfb7bcb41a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,182 -1,7 +1,183 @@@
 -4.0.5
 +4.1-alpha2
 + * Remove expired snapshots of dropped tables after restart (CASSANDRA-17619) 
 +Merged from 4.0:
+  * Ensure FileStreamTask cannot compromise shared channel proxy for system table when interrupted (CASSANDRA-17663)
   * silence benign SslClosedEngineException (CASSANDRA-17565)
  Merged from 3.11:
 +Merged from 3.0:
 +
 +
 +4.1-alpha1
 + * Handle config parameters upper bound on startup; Fix auto_snapshot_ttl and paxos_purge_grace_period min unit validations (CASSANDRA-17571)
 + * Fix leak of non-standard Java types in our Exceptions as clients using JMX are unable to handle them.
 +   Remove useless validation that leads to unnecessary additional read of cassandra.yaml on startup (CASSANDRA-17638)
 + * Fix repair_request_timeout_in_ms and remove paxos_auto_repair_threshold_mb (CASSANDRA-17557)
 + * Incremental repair leaks SomeRepairFailedException after switch away from flatMap (CASSANDRA-17620)
 + * StorageService read threshold get methods throw NullPointerException due to not handling null configs (CASSANDRA-17593)
 + * Rename truncate_drop guardrail to drop_truncate_table (CASSANDRA-17592)
 + * nodetool enablefullquerylog can NPE when directory has no files (CASSANDRA-17595)
 + * Add auto_snapshot_ttl configuration (CASSANDRA-16790)
 + * List snapshots of dropped tables (CASSANDRA-16843)
 + * Add information whether sstables are dropped to SchemaChangeListener (CASSANDRA-17582)
 + * Add a pluggable memtable API (CEP-11 / CASSANDRA-17034)
 + * Save sstable id as string in activity table (CASSANDRA-17585)
 + * Implement startup check to prevent Cassandra to potentially spread zombie data (CASSANDRA-17180)
 + * Allow failing startup on duplicate config keys (CASSANDRA-17379)
 + * Migrate threshold for minimum keyspace replication factor to guardrails (CASSANDRA-17212)
 + * Add guardrail to disallow TRUNCATE and DROP TABLE commands (CASSANDRA-17558)
 + * Add plugin support for CQLSH (CASSANDRA-16456)
 + * Add guardrail to disallow querying with ALLOW FILTERING (CASSANDRA-17370)
 + * Enhance SnakeYAML properties to be reusable outside of YAML parsing, support camel case conversion to snake case, and add support to ignore properties (CASSANDRA-17166)
 + * nodetool compact should support using a key string to find the range to avoid operators having to manually do this (CASSANDRA-17537)
 + * Add guardrail for data disk usage (CASSANDRA-17150)
 + * Tool to list data paths of existing tables (CASSANDRA-17568)
 + * Migrate track_warnings to more standard naming conventions and use latest configuration types rather than long (CASSANDRA-17560)
 + * Add support for CONTAINS and CONTAINS KEY in conditional UPDATE and DELETE statement (CASSANDRA-10537)
 + * Migrate advanced config parameters to the new Config types (CASSANDRA-17431)
 + * Make null to be meaning disabled and leave 0 as a valid value for permissions_update_interval, roles_update_interval, credentials_update_interval (CASSANDRA-17431)
 + * Fix typo in Config annotation (CASSANDRA-17431)
 + * Made Converters type safe and fixed a few cases where converters used the wrong type (CASSANDRA-17431)
 + * Fix null bug in DataStorageSpec and DurationSpec and require units to be added when providing 0 value (CASSANDRA-17431)
 + * Shutdown ScheduledExecutors as part of node drainage (CASSANDRA-17493)
 + * Provide JMX endpoint to allow transient logging of blocking read repairs (CASSANDRA-17471)
 + * Add guardrail for GROUP BY queries (CASSANDRA-17509)
 + * make pylib PEP and pylint compliant (CASSANDRA-17546)
 + * Add support for vnodes in jvm-dtest (CASSANDRA-17332)
 + * Remove guardrails global enable flag (CASSANDRA-17499)
 + * Clients using JMX are unable to handle non-standard java types but we leak this into our interfaces (CASSANDRA-17527)
 + * Remove stress server functionality (CASSANDRA-17535)
 + * Reduce histogram snapshot long[] allocation overhead during speculative read and write threshold updates (CASSANDRA-17523)
 + * Add guardrail for creation of secondary indexes (CASSANDRA-17498)
 + * Add guardrail to disallow creation of uncompressed tables (CASSANDRA-17504)
 + * Add guardrail to disallow creation of new COMPACT STORAGE tables (CASSANDRA-17522)
 + * repair vtables should expose a completed field due to lack of filtering options in CQL (CASSANDRA-17520)
 + * remove outdated code from cqlsh (CASSANDRA-17490)
 + * remove support for deprecated version specific TLS in Python 3.6 (CASSANDRA-17365)
 + * Add support for IF EXISTS and IF NOT EXISTS in ALTER statements (CASSANDRA-16916)
 + * resolve several pylint issues in cqlsh.py and pylib (CASSANDRA-17480)
 + * Streaming sessions longer than 3 minutes fail with timeout (CASSANDRA-17510)
 + * Add ability to track state in repair (CASSANDRA-15399)
 + * Remove unused 'parse' module (CASSANDRA-17484)
 + * change six functions in cqlshlib to native Python 3 (CASSANDRA-17417)
 + * reduce hot-path object allocations required to record local/remote requests against the client request metrics (CASSANDRA-17424)
 + * Disallow removing DC from system_auth while nodes are active in the DC (CASSANDRA-17478)
 + * Add guardrail for the number of fields per UDT (CASSANDRA-17385)
 + * Allow users to change cqlsh history location using env variable (CASSANDRA-17448)
 + * Add required -f option to use nodetool verify and standalone sstableverify (CASSANDRA-17017)
 + * Add support for UUID based sstable generation identifiers (CASSANDRA-17048)
 + * Log largest memtable flush at info instead of debug (CASSANDRA-17472)
 + * Add native transport rate limiter options to example cassandra.yaml, and expose metric for dispatch rate (CASSANDRA-17423)
 + * Add diagnostic events for guardrails (CASSANDRA-17197)
 + * Pre hashed passwords in CQL (CASSANDRA-17334)
 + * Increase cqlsh version (CASSANDRA-17432)
 + * Update SUPPORTED_UPGRADE_PATHS to include 3.0 and 3.x to 4.1 paths and remove obsolete tests (CASSANDRA-17362)
 + * Support DELETE in CQLSSTableWriter (CASSANDRA-14797)
 + * Failed inbound internode authentication failures generate ugly warning with stack trace (CASSANDRA-17068)
 + * Expose gossip information in system_views.gossip_info virtual table (CASSANDRA-17002)
 + * Add guardrails for collection items and size (CASSANDRA-17153)
 + * Improve guardrails messages (CASSANDRA-17430)
 + * Remove all usages of junit.framework and ban them via Checkstyle (CASSANDRA-17316)
 + * Add guardrails for read/write consistency levels (CASSANDRA-17188)
 + * Add guardrail for SELECT IN terms and their cartesian product (CASSANDRA-17187)
 + * remove unused imports in cqlsh.py and cqlshlib (CASSANDRA-17413)
 + * deprecate property windows_timer_interval (CASSANDRA-17404)
 + * Expose streaming as a vtable (CASSANDRA-17390)
 + * Expose all client options via system_views.clients and nodetool clientstats (CASSANDRA-16378)
 + * Make startup checks configurable (CASSANDRA-17220)
 + * Add guardrail for number of partition keys on IN queries (CASSANDRA-17186)
 + * update Python test framework from nose to pytest (CASSANDRA-17293)
 + * Fix improper CDC commit log segments deletion in non-blocking mode (CASSANDRA-17233)
 + * Add support for string concatenations through the + operator (CASSANDRA-17190)
 + * Limit the maximum hints size per host (CASSANDRA-17142)
 + * Add a virtual table for exposing batch metrics (CASSANDRA-17225)
 + * Flatten guardrails config (CASSANDRA-17353)
 + * Instance failed to start up due to NPE in StartupClusterConnectivityChecker (CASSANDRA-17347)
 + * add the shorter version of version flag (-v) in cqlsh (CASSANDRA-17236)
 + * Make vtables accessible via internode messaging (CASSANDRA-17295)
 + * Add support for PEM based key material for SSL (CASSANDRA-17031)
 + * Standardize storage configuration parameters' names. Support unit suffixes. (CASSANDRA-15234)
 + * Remove support for Windows (CASSANDRA-16956)
 + * Runtime-configurable YAML option to prohibit USE statements (CASSANDRA-17318)
 + * When streaming sees a ClosedChannelException this triggers the disk failure policy (CASSANDRA-17116)
 + * Add a virtual table for exposing prepared statements metrics (CASSANDRA-17224)
 + * Remove python 2.x support from cqlsh (CASSANDRA-17242)
 + * Prewarm role and credential caches to avoid timeouts at startup (CASSANDRA-16958)
 + * Make capacity/validity/updateinterval/activeupdate for Auth Caches configurable via nodetool (CASSANDRA-17063)
 + * Added startup check for read_ahead_kb setting (CASSANDRA-16436)
 + * Avoid unecessary array allocations and initializations when performing query checks (CASSANDRA-17209)
 + * Add guardrail for list operations that require read before write (CASSANDRA-17154)
 + * Migrate thresholds for number of keyspaces and tables to guardrails (CASSANDRA-17195)
 + * Remove self-reference in SSTableTidier (CASSANDRA-17205)
 + * Add guardrail for query page size (CASSANDRA-17189)
 + * Allow column_index_size_in_kb to be configurable through nodetool (CASSANDRA-17121)
 + * Emit a metric for number of local read and write calls
 + * Add non-blocking mode for CDC writes (CASSANDRA-17001)
 + * Add guardrails framework (CASSANDRA-17147)
 + * Harden resource management on SSTable components to prevent future leaks (CASSANDRA-17174)
 + * Make nodes more resilient to local unrelated files during startup (CASSANDRA-17082)
 + * repair prepare message would produce a wrong error message if network timeout happened rather than reply wait timeout (CASSANDRA-16992)
 + * Log queries that fail on timeout or unavailable errors up to once per minute by default (CASSANDRA-17159)
 + * Refactor normal/preview/IR repair to standardize repair cleanup and error handling of failed RepairJobs (CASSANDRA-17069)
 + * Log missing peers in StartupClusterConnectivityChecker (CASSANDRA-17130)
 + * Introduce separate rate limiting settings for entire SSTable streaming (CASSANDRA-17065)
 + * Implement Virtual Tables for Auth Caches (CASSANDRA-16914)
 + * Actively update auth cache in the background (CASSANDRA-16957)
 + * Add unix time conversion functions (CASSANDRA-17029)
 + * JVMStabilityInspector.forceHeapSpaceOomMaybe should handle all non-heap OOMs rather than only supporting direct only (CASSANDRA-17128)
 + * Forbid other Future implementations with checkstyle (CASSANDRA-17055)
 + * commit log was switched from non-daemon to daemon threads, which causes the JVM to exit in some case as no non-daemon threads are active (CASSANDRA-17085)
 + * Add a Denylist to block reads and writes on specific partition keys (CASSANDRA-12106)
 + * v4+ protocol did not clean up client warnings, which caused leaking the state (CASSANDRA-17054)
 + * Remove duplicate toCQLString in ReadCommand (CASSANDRA-17023)
 + * Ensure hint window is persistent across restarts of a node (CASSANDRA-14309)
 + * Allow to GRANT or REVOKE multiple permissions in a single statement (CASSANDRA-17030)
 + * Allow to grant permission for all tables in a keyspace (CASSANDRA-17027)
 + * Log time spent writing keys during compaction (CASSANDRA-17037)
 + * Make nodetool compactionstats and sstable_tasks consistent (CASSANDRA-16976)
 + * Add metrics and logging around index summary redistribution (CASSANDRA-17036)
 + * Add configuration options for minimum allowable replication factor and default replication factor (CASSANDRA-14557)
 + * Expose information about stored hints via a nodetool command and a virtual table (CASSANDRA-14795)
 + * Add broadcast_rpc_address to system.local (CASSANDRA-11181)
 + * Add support for type casting in WHERE clause components and in the values of INSERT/UPDATE statements (CASSANDRA-14337)
 + * add credentials file support to CQLSH (CASSANDRA-16983)
 + * Skip remaining bytes in the Envelope buffer when a ProtocolException is thrown to avoid double decoding (CASSANDRA-17026)
 + * Allow reverse iteration of resources during permissions checking (CASSANDRA-17016)
 + * Add feature to verify correct ownership of attached locations on disk at startup (CASSANDRA-16879)
 + * Make SSLContext creation pluggable/extensible (CASSANDRA-16666)
 + * Add soft/hard limits to local reads to protect against reading too much data in a single query (CASSANDRA-16896)
 + * Avoid token cache invalidation for removing a non-member node (CASSANDRA-15290)
 + * Allow configuration of consistency levels on auth operations (CASSANDRA-12988)
 + * Add number of sstables in a compaction to compactionstats output (CASSANDRA-16844)
 + * Upgrade Caffeine to 2.9.2 (CASSANDRA-15153)
 + * Allow DELETE and TRUNCATE to work on Virtual Tables if the implementation allows it (CASSANDRA-16806)
 + * Include SASI components to snapshots (CASSANDRA-15134)
 + * Fix missed wait latencies in the output of `nodetool tpstats -F` (CASSANDRA-16938)
 + * Reduce native transport max frame size to 16MB (CASSANDRA-16886)
 + * Add support for filtering using IN restrictions (CASSANDRA-14344)
 + * Provide a nodetool command to invalidate auth caches (CASSANDRA-16404)
 + * Catch read repair timeout exceptions and add metric (CASSANDRA-16880)
 + * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies (CASSANDRA-16854)
 + * Add client warnings and abort to tombstone and coordinator reads which go past a low/high watermark (CASSANDRA-16850)
 + * Add TTL support to nodetool snapshots (CASSANDRA-16789)
 + * Allow CommitLogSegmentReader to optionally skip sync marker CRC checks (CASSANDRA-16842)
 + * allow blocking IPs from updating metrics about traffic (CASSANDRA-16859)
 + * Request-Based Native Transport Rate-Limiting (CASSANDRA-16663)
 + * Implement nodetool getauditlog command (CASSANDRA-16725)
 + * Clean up repair code (CASSANDRA-13720)
 + * Background schedule to clean up orphaned hints files (CASSANDRA-16815)
 + * Modify SecondaryIndexManager#indexPartition() to retrieve only columns for which indexes are actually being built (CASSANDRA-16776)
 + * Batch the token metadata update to improve the speed (CASSANDRA-15291)
 + * Reduce the log level on "expected" repair exceptions (CASSANDRA-16775)
 + * Make JMXTimer expose attributes using consistent time unit (CASSANDRA-16760)
 + * Remove check on gossip status from DynamicEndpointSnitch::updateScores (CASSANDRA-11671)
 + * Fix AbstractReadQuery::toCQLString not returning valid CQL (CASSANDRA-16510)
 + * Log when compacting many tombstones (CASSANDRA-16780)
 + * Display bytes per level in tablestats for LCS tables (CASSANDRA-16799)
 + * Add isolated flush timer to CommitLogMetrics and ensure writes correspond to single WaitingOnCommit data points (CASSANDRA-16701)
 + * GossiperTest.testHasVersion3Nodes didn't take into account trunk version changes, fixed to rely on latest version (CASSANDRA-16651)
 + * Update JNA library to 5.9.0 and snappy-java to version 1.1.8.4 (CASSANDRA-17040)
 +Merged from 4.0:
 +Merged from 3.11:
  Merged from 3.0:
   * Fix issue where frozen maps may not be serialized in the correct order (CASSANDRA-17623)
   * Suppress CVE-2022-24823 (CASSANDRA-17633)
diff --cc src/java/org/apache/cassandra/repair/LocalSyncTask.java
index 71cec282ad,99315754bd..28a6accff7
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@@ -19,6 -19,9 +19,7 @@@ package org.apache.cassandra.repair
  
  import java.util.Collections;
  import java.util.List;
 -import java.util.UUID;
 -import java.util.concurrent.CompletableFuture;
+ import java.util.concurrent.atomic.AtomicBoolean;
  
  import com.google.common.annotations.VisibleForTesting;
  import com.google.common.base.Preconditions;
@@@ -40,7 -43,6 +41,9 @@@ import org.apache.cassandra.streaming.S
  import org.apache.cassandra.tracing.TraceState;
  import org.apache.cassandra.tracing.Tracing;
  import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.TimeUUID;
++import org.apache.cassandra.utils.concurrent.AsyncPromise;
++import org.apache.cassandra.utils.concurrent.Promise;
  
  /**
   * LocalSyncTask performs streaming between local(coordinator) node and remote replica.
@@@ -58,11 -60,11 +61,11 @@@ public class LocalSyncTask extends Sync
      @VisibleForTesting
      public final boolean transferRanges;
  
-     private boolean active = true;
-     private StreamPlan streamPlan;
+     private final AtomicBoolean active = new AtomicBoolean(true);
 -    private final CompletableFuture<StreamPlan> planFuture = new CompletableFuture<>();
++    private final Promise<StreamPlan> planPromise = new AsyncPromise<>();
  
      public LocalSyncTask(RepairJobDesc desc, InetAddressAndPort local, InetAddressAndPort remote,
 -                         List<Range<Token>> diff, UUID pendingRepair,
 +                         List<Range<Token>> diff, TimeUUID pendingRepair,
                           boolean requestRanges, boolean transferRanges, PreviewKind previewKind)
      {
          super(desc, local, remote, diff, previewKind);
@@@ -115,8 -117,9 +118,9 @@@
              logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
              Tracing.traceRepair(message);
  
-             streamPlan = createStreamPlan();
-             streamPlan.execute();
+             StreamPlan plan = createStreamPlan();
+             plan.execute();
 -            planFuture.complete(plan);
++            planPromise.setSuccess(plan);
          }
      }
  
@@@ -169,12 -171,11 +172,11 @@@
      }
  
      @Override
-     public synchronized void onFailure(Throwable t)
+     public void onFailure(Throwable t)
      {
-         if (active)
+         if (active.compareAndSet(true, false))
          {
-             active = false;
 -            setException(t);
 +            tryFailure(t);
              finished();
          }
      }
@@@ -191,22 -192,12 +193,12 @@@
      }
  
      @Override
-     public synchronized void abort()
+     public void abort()
      {
-         if (active)
 -        planFuture.whenComplete((plan, cause) ->
++        planPromise.addCallback((plan, cause) ->
          {
-             if (streamPlan == null)
-             {
-                 active = false;
-                 String message = String.format("Sync for session %s between %s and %s on %s aborted before starting",
-                                                desc.sessionId, nodePair.coordinator, nodePair.peer, desc.columnFamily);
-                 logger.debug("{} {}", previewKind.logPrefix(desc.sessionId), message);
-                 trySuccess(stat);
-             }
-             else
-             {
-                 streamPlan.getCoordinator().getAllStreamSessions().forEach(StreamSession::abort);
-             }
-         }
+             assert plan != null : "StreamPlan future should never be completed exceptionally";
+             plan.getCoordinator().getAllStreamSessions().forEach(StreamSession::abort);
+         });
      }
  }
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index 595890def8,b2739b331a..2036262e54
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -1084,42 -1037,18 +1084,48 @@@ public class StreamSession implements I
          public void onClose(InetAddressAndPort from);
      }
  
 +    public static String createLogTag(StreamSession session)
 +    {
 +        return createLogTag(session, (Object) null);
 +    }
 +
 +    public static String createLogTag(StreamSession session, StreamingChannel channel)
 +    {
 +        return createLogTag(session, channel == null ? null : channel.id());
 +    }
 +
 +    public static String createLogTag(StreamSession session, Channel channel)
 +    {
 +        return createLogTag(session, channel == null ? null : channel.id());
 +    }
 +
 +    public static String createLogTag(StreamSession session, Object channelId)
 +    {
 +        StringBuilder sb = new StringBuilder(64);
 +        sb.append("[Stream");
 +
 +        if (session != null)
 +            sb.append(" #").append(session.planId());
 +
 +        if (channelId != null)
 +            sb.append(" channel: ").append(channelId);
 +
 +        sb.append(']');
 +        return sb.toString();
 +    }
 +
      public synchronized void abort()
      {
+         if (state.isFinalState())
+         {
+             logger.debug("[Stream #{}] Stream session with peer {} is already in a final state on abort.", planId(), peer);
+             return;
+         }
+ 
          logger.info("[Stream #{}] Aborting stream session with peer {}...", planId(), peer);
  
 -        if (getMessageSender().connected())
 -            getMessageSender().sendMessage(new SessionFailedMessage());
 +        if (channel.connected())
 +            channel.sendControlMessage(new SessionFailedMessage());
  
          try
          {
diff --cc src/java/org/apache/cassandra/streaming/StreamingChannel.java
index f49089c48a,0000000000..a638638bc2
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/streaming/StreamingChannel.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingChannel.java
@@@ -1,78 -1,0 +1,87 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.streaming;
 +
 +import java.io.IOException;
 +import java.net.InetSocketAddress;
 +import java.util.function.IntFunction;
 +
 +import io.netty.util.concurrent.Future; //checkstyle: permit this import
 +import org.apache.cassandra.streaming.async.NettyStreamingConnectionFactory;
 +import org.apache.cassandra.utils.Shared;
 +
 +import static org.apache.cassandra.utils.Shared.Recursive.INTERFACES;
 +import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 +
 +@Shared(scope = SIMULATION, inner = INTERFACES)
 +public interface StreamingChannel
 +{
 +    public interface Factory
 +    {
 +        public static class Global
 +        {
 +            private static StreamingChannel.Factory FACTORY = new NettyStreamingConnectionFactory();
 +            public static StreamingChannel.Factory streamingFactory()
 +            {
 +                return FACTORY;
 +            }
 +
 +            public static void unsafeSet(StreamingChannel.Factory factory)
 +            {
 +                FACTORY = factory;
 +            }
 +        }
 +
 +        StreamingChannel create(InetSocketAddress to, int messagingVersion, Kind kind) throws IOException;
++
++        default StreamingChannel create(InetSocketAddress to,
++                                        InetSocketAddress preferred,
++                                        int messagingVersion,
++                                        StreamingChannel.Kind kind) throws IOException
++        {
++            // Implementations can decide whether or not to do something with the preferred address.
++            return create(to, messagingVersion, kind);
++        }
 +    }
 +
 +    public enum Kind { CONTROL, FILE }
 +
 +    public interface Send
 +    {
 +        void send(IntFunction<StreamingDataOutputPlus> outSupplier) throws IOException;
 +    }
 +
 +    Object id();
 +    String description();
 +
 +    InetSocketAddress peer();
 +    InetSocketAddress connectedTo();
 +    boolean connected();
 +
 +    StreamingDataInputPlus in();
 +
 +    /**
 +     * until closed, cannot invoke {@link #send(Send)}
 +     */
 +    StreamingDataOutputPlus acquireOut();
 +    Future<?> send(Send send) throws IOException;
 +
 +    Future<?> close();
 +    void onClose(Runnable runOnClose);
 +}
diff --cc src/java/org/apache/cassandra/streaming/async/NettyStreamingConnectionFactory.java
index 946df59892,0000000000..6a57e395e4
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/streaming/async/NettyStreamingConnectionFactory.java
+++ b/src/java/org/apache/cassandra/streaming/async/NettyStreamingConnectionFactory.java
@@@ -1,76 -1,0 +1,85 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.streaming.async;
 +
 +import java.io.IOException;
 +import java.net.InetSocketAddress;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +
 +import io.netty.channel.Channel;
 +import io.netty.channel.ChannelPipeline;
 +import io.netty.channel.EventLoop;
 +import io.netty.util.concurrent.Future; // checkstyle: permit this import
 +import org.apache.cassandra.net.ConnectionCategory;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.net.OutboundConnectionInitiator.Result;
 +import org.apache.cassandra.net.OutboundConnectionInitiator.Result.StreamingSuccess;
 +import org.apache.cassandra.net.OutboundConnectionSettings;
 +import org.apache.cassandra.streaming.StreamingChannel;
 +
 +import static org.apache.cassandra.locator.InetAddressAndPort.getByAddress;
 +import static org.apache.cassandra.net.OutboundConnectionInitiator.initiateStreaming;
 +
 +public class NettyStreamingConnectionFactory implements StreamingChannel.Factory
 +{
 +    @VisibleForTesting
 +    public static int MAX_CONNECT_ATTEMPTS = 3;
 +
 +    public static NettyStreamingChannel connect(OutboundConnectionSettings template, int messagingVersion, StreamingChannel.Kind kind) throws IOException
 +    {
 +        EventLoop eventLoop = MessagingService.instance().socketFactory.outboundStreamingGroup().next();
 +
 +        int attempts = 0;
 +        while (true)
 +        {
 +            Future<Result<StreamingSuccess>> result = initiateStreaming(eventLoop, template.withDefaults(ConnectionCategory.STREAMING), messagingVersion);
 +            result.awaitUninterruptibly(); // initiate has its own timeout, so this is "guaranteed" to return relatively promptly
 +            if (result.isSuccess())
 +            {
 +                Channel channel = result.getNow().success().channel;
 +                NettyStreamingChannel streamingChannel = new NettyStreamingChannel(messagingVersion, channel, kind);
 +                if (kind == StreamingChannel.Kind.CONTROL)
 +                {
 +                    ChannelPipeline pipeline = channel.pipeline();
 +                    pipeline.addLast("stream", streamingChannel);
 +                }
 +                return streamingChannel;
 +            }
 +
 +            if (++attempts == MAX_CONNECT_ATTEMPTS)
 +                throw new IOException("failed to connect to " + template.to + " for streaming data", result.cause());
 +        }
 +    }
 +
 +    @Override
 +    public StreamingChannel create(InetSocketAddress to, int messagingVersion, StreamingChannel.Kind kind) throws IOException
 +    {
 +        return connect(new OutboundConnectionSettings(getByAddress(to)), messagingVersion, kind);
 +    }
++
++    @Override
++    public StreamingChannel create(InetSocketAddress to,
++                                   InetSocketAddress preferred,
++                                   int messagingVersion,
++                                   StreamingChannel.Kind kind) throws IOException
++    {
++        return connect(new OutboundConnectionSettings(getByAddress(to), getByAddress(preferred)), messagingVersion, kind);
++    }
 +}
diff --cc src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
index 4f3a443515,0000000000..c2e551edb6
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
+++ b/src/java/org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.java
@@@ -1,435 -1,0 +1,449 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.streaming.async;
 +
 +import java.io.IOError;
 +import java.io.IOException;
 +import java.net.InetSocketAddress;
 +import java.nio.channels.ClosedByInterruptException;
 +import java.util.Collection;
 +import java.util.concurrent.ConcurrentMap;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ScheduledFuture;
 +
 +import javax.annotation.Nullable;
 +
- import org.apache.cassandra.locator.InetAddressAndPort;
- import org.apache.cassandra.streaming.StreamDeserializingTask;
- import org.apache.cassandra.streaming.StreamingChannel;
- import org.apache.cassandra.streaming.StreamingDataOutputPlus;
- import org.apache.cassandra.utils.concurrent.ImmediateFuture;
- import org.apache.cassandra.utils.concurrent.Semaphore;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import io.netty.channel.Channel;
 +import io.netty.channel.ChannelFuture;
 +import io.netty.util.concurrent.Future; // checkstyle: permit this import
 +import org.apache.cassandra.concurrent.ExecutorPlus;
++import org.apache.cassandra.db.SystemKeyspace;
++import org.apache.cassandra.locator.InetAddressAndPort;
++import org.apache.cassandra.streaming.StreamDeserializingTask;
++import org.apache.cassandra.streaming.StreamingChannel;
++import org.apache.cassandra.streaming.StreamingDataOutputPlus;
 +import org.apache.cassandra.streaming.StreamSession;
 +import org.apache.cassandra.streaming.messages.IncomingStreamMessage;
 +import org.apache.cassandra.streaming.messages.OutgoingStreamMessage;
 +import org.apache.cassandra.streaming.messages.StreamMessage;
++import org.apache.cassandra.utils.concurrent.ImmediateFuture;
++import org.apache.cassandra.utils.concurrent.Semaphore;
 +import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 +
 +import static com.google.common.base.Throwables.getRootCause;
 +import static java.lang.Integer.parseInt;
 +import static java.lang.String.format;
 +import static java.lang.System.getProperty;
 +import static java.lang.Thread.currentThread;
 +import static java.util.concurrent.TimeUnit.*;
++
 +import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 +import static org.apache.cassandra.config.Config.PROPERTY_PREFIX;
 +import static org.apache.cassandra.streaming.StreamSession.createLogTag;
 +import static org.apache.cassandra.streaming.messages.StreamMessage.serialize;
 +import static org.apache.cassandra.streaming.messages.StreamMessage.serializedSize;
 +import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 +import static org.apache.cassandra.utils.FBUtilities.getAvailableProcessors;
 +import static org.apache.cassandra.utils.JVMStabilityInspector.inspectThrowable;
 +import static org.apache.cassandra.utils.concurrent.BlockingQueues.newBlockingQueue;
 +import static org.apache.cassandra.utils.concurrent.Semaphore.newFairSemaphore;
 +
- import static org.apache.cassandra.utils.Clock.Global.nanoTime;
- 
 +/**
 + * Responsible for sending {@link StreamMessage}s to a given peer. We manage an array of netty {@link Channel}s
 + * for sending {@link OutgoingStreamMessage} instances; all other {@link StreamMessage} types are sent via
 + * a special control channel. The reason for this is to treat those messages carefully and not let them get stuck
 + * behind a stream transfer.
 + *
 + * One of the challenges when sending streams is we might need to delay shipping the stream if:
 + *
 + * - we've exceeded our network I/O use due to rate limiting (at the cassandra level)
 + * - the receiver isn't keeping up, which causes the local TCP socket buffer to not empty, which causes epoll writes to not
 + * move any bytes to the socket, which causes buffers to stick around in user-land (a/k/a cassandra) memory.
 + *
 + * When those conditions occur, it's easy enough to reschedule processing the stream once the resources pick up
 + * (we acquire the permits from the rate limiter, or the socket drains). However, we need to ensure that
 + * no other messages are submitted to the same channel while the current stream is still being processed.
 + */
 +public class StreamingMultiplexedChannel
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(StreamingMultiplexedChannel.class);
 +
 +    private static final int DEFAULT_MAX_PARALLEL_TRANSFERS = getAvailableProcessors();
 +    private static final int MAX_PARALLEL_TRANSFERS = parseInt(getProperty(PROPERTY_PREFIX + "streaming.session.parallelTransfers", Integer.toString(DEFAULT_MAX_PARALLEL_TRANSFERS)));
 +
 +    // a simple mechansim for allowing a degree of fairness across multiple sessions
 +    private static final Semaphore fileTransferSemaphore = newFairSemaphore(DEFAULT_MAX_PARALLEL_TRANSFERS);
 +
 +    private final StreamingChannel.Factory factory;
 +    private final InetAddressAndPort to;
 +    private final StreamSession session;
 +    private final int messagingVersion;
 +
 +    private volatile boolean closed;
 +
 +    /**
 +     * A special {@link Channel} for sending non-stream streaming messages, basically anything that isn't an
 +     * {@link OutgoingStreamMessage} (or an {@link IncomingStreamMessage}, but a node doesn't send that, it's only received).
 +     */
 +    private volatile StreamingChannel controlChannel;
 +
 +    // note: this really doesn't need to be a LBQ, just something that's thread safe
 +    private final Collection<ScheduledFuture<?>> channelKeepAlives = newBlockingQueue();
 +
 +    private final ExecutorPlus fileTransferExecutor;
 +
 +    /**
 +     * A mapping of each {@link #fileTransferExecutor} thread to a channel that can be written to (on that thread).
 +     */
 +    private final ConcurrentMap<Thread, StreamingChannel> threadToChannelMap = new ConcurrentHashMap<>();
 +
 +    public StreamingMultiplexedChannel(StreamSession session, StreamingChannel.Factory factory, InetAddressAndPort to, @Nullable StreamingChannel controlChannel, int messagingVersion)
 +    {
 +        this.session = session;
 +        this.factory = factory;
 +        this.to = to;
 +        this.messagingVersion = messagingVersion;
 +        this.controlChannel = controlChannel;
 +
 +        String name = session.peer.toString().replace(':', '.');
 +        fileTransferExecutor = executorFactory()
 +                .configurePooled("NettyStreaming-Outbound-" + name, MAX_PARALLEL_TRANSFERS)
 +                .withKeepAlive(1L, SECONDS).build();
 +    }
 +
 +
 +
 +    public InetAddressAndPort peer()
 +    {
 +        return to;
 +    }
 +
 +    public InetSocketAddress connectedTo()
 +    {
 +        return controlChannel == null ? to : controlChannel.connectedTo();
 +    }
 +
 +    /**
 +     * Used by initiator to setup control message channel connecting to follower
 +     */
 +    private void setupControlMessageChannel() throws IOException
 +    {
 +        if (controlChannel == null)
 +        {
 +            /*
 +             * Inbound handlers are needed:
 +             *  a) for initiator's control channel(the first outbound channel) to receive follower's message.
 +             *  b) for streaming receiver (note: both initiator and follower can receive streaming files) to reveive files,
 +             *     in {@link Handler#setupStreamingPipeline}
 +             */
-             controlChannel = createChannel(StreamingChannel.Kind.CONTROL);
++            controlChannel = createControlChannel();
 +        }
 +    }
 +
-     private StreamingChannel createChannel(StreamingChannel.Kind kind) throws IOException
++    private StreamingChannel createControlChannel() throws IOException
 +    {
 +        logger.debug("Creating stream session to {} as {}", to, session.isFollower() ? "follower" : "initiator");
 +
-         StreamingChannel channel = factory.create(to, messagingVersion, kind);
-         if (kind == StreamingChannel.Kind.CONTROL)
-         {
-             executorFactory().startThread(String.format("Stream-Deserializer-%s-%s", to.toString(), channel.id()),
-                                           new StreamDeserializingTask(session, channel, messagingVersion));
-             session.attachInbound(channel);
-         }
++        StreamingChannel channel = factory.create(to, messagingVersion, StreamingChannel.Kind.CONTROL);
++        executorFactory().startThread(String.format("Stream-Deserializer-%s-%s", to.toString(), channel.id()),
++                                      new StreamDeserializingTask(session, channel, messagingVersion));
++        session.attachInbound(channel);
 +        session.attachOutbound(channel);
 +
-         logger.debug("Creating {}", channel.description());
++        logger.debug("Creating control {}", channel.description());
++        return channel;
++    }
++    
++    private StreamingChannel createFileChannel(InetAddressAndPort connectTo) throws IOException
++    {
++        logger.debug("Creating stream session to {} as {}", to, session.isFollower() ? "follower" : "initiator");
++
++        StreamingChannel channel = factory.create(to, connectTo, messagingVersion, StreamingChannel.Kind.FILE);
++        session.attachOutbound(channel);
++
++        logger.debug("Creating file {}", channel.description());
 +        return channel;
 +    }
 +
 +    public Future<?> sendControlMessage(StreamMessage message)
 +    {
 +        try
 +        {
 +            setupControlMessageChannel();
 +            return sendMessage(controlChannel, message);
 +        }
 +        catch (Exception e)
 +        {
 +            close();
 +            session.onError(e);
 +            return ImmediateFuture.failure(e);
 +        }
 +
 +    }
 +    public Future<?> sendMessage(StreamingChannel channel, StreamMessage message)
 +    {
 +        if (closed)
 +            throw new RuntimeException("stream has been closed, cannot send " + message);
 +
 +        if (message instanceof OutgoingStreamMessage)
 +        {
 +            if (session.isPreview())
 +                throw new RuntimeException("Cannot send stream data messages for preview streaming sessions");
 +            if (logger.isDebugEnabled())
 +                logger.debug("{} Sending {}", createLogTag(session), message);
-             return fileTransferExecutor.submit(new FileStreamTask((OutgoingStreamMessage)message));
++
++            InetAddressAndPort connectTo = SystemKeyspace.getPreferredIP(to);
++            return fileTransferExecutor.submit(new FileStreamTask((OutgoingStreamMessage) message, connectTo));
 +        }
 +
 +        try
 +        {
 +            Future<?> promise = channel.send(outSupplier -> {
 +                // we anticipate that the control messages are rather small, so allocating a ByteBuf shouldn't  blow out of memory.
 +                long messageSize = serializedSize(message, messagingVersion);
 +                if (messageSize > 1 << 30)
 +                {
 +                    throw new IllegalStateException(format("%s something is seriously wrong with the calculated stream control message's size: %d bytes, type is %s",
 +                                                           createLogTag(session, controlChannel.id()), messageSize, message.type));
 +                }
 +                try (StreamingDataOutputPlus out = outSupplier.apply((int) messageSize))
 +                {
 +                    StreamMessage.serialize(message, out, messagingVersion, session);
 +                }
 +            });
 +            promise.addListener(future -> onMessageComplete(future, message));
 +            return promise;
 +        }
 +        catch (Exception e)
 +        {
 +            close();
 +            session.onError(e);
 +            return ImmediateFuture.failure(e);
 +        }
 +    }
 +
 +    /**
 +     * Decides what to do after a {@link StreamMessage} is processed.
 +     *
 +     * Note: this is called from the netty event loop.
 +     *
 +     * @return null if the message was processed sucessfully; else, a {@link java.util.concurrent.Future} to indicate
 +     * the status of aborting any remaining tasks in the session.
 +     */
 +    Future<?> onMessageComplete(Future<?> future, StreamMessage msg)
 +    {
 +        Throwable cause = future.cause();
 +        if (cause == null)
 +            return null;
 +
 +        Channel channel = future instanceof ChannelFuture ? ((ChannelFuture)future).channel() : null;
 +        logger.error("{} failed to send a stream message/data to peer {}: msg = {}",
 +                     createLogTag(session, channel), to, msg, future.cause());
 +
 +        // StreamSession will invoke close(), but we have to mark this sender as closed so the session doesn't try
 +        // to send any failure messages
 +        return session.onError(cause);
 +    }
 +
 +    class FileStreamTask implements Runnable
 +    {
 +        /**
 +         * Time interval, in minutes, to wait between logging a message indicating that we're waiting on a semaphore
 +         * permit to become available.
 +         */
 +        private static final int SEMAPHORE_UNAVAILABLE_LOG_INTERVAL = 3;
 +
 +        /**
 +         * Even though we expect only an {@link OutgoingStreamMessage} at runtime, the type here is {@link StreamMessage}
 +         * to facilitate simpler testing.
 +         */
 +        private final StreamMessage msg;
 +
-         FileStreamTask(OutgoingStreamMessage ofm)
++        private final InetAddressAndPort connectTo;
++
++        FileStreamTask(OutgoingStreamMessage ofm, InetAddressAndPort connectTo)
 +        {
 +            this.msg = ofm;
++            this.connectTo = connectTo;
 +        }
 +
 +        /**
 +         * For testing purposes
 +         */
 +        FileStreamTask(StreamMessage msg)
 +        {
 +            this.msg = msg;
++            this.connectTo = null;
 +        }
 +
 +        @Override
 +        public void run()
 +        {
 +            if (!acquirePermit(SEMAPHORE_UNAVAILABLE_LOG_INTERVAL))
 +                return;
 +
 +            StreamingChannel channel = null;
 +            try
 +            {
-                 channel = getOrCreateChannel();
++                channel = getOrCreateFileChannel(connectTo);
 +
 +                // close the DataOutputStreamPlus as we're done with it - but don't close the channel
 +                try (StreamingDataOutputPlus out = channel.acquireOut())
 +                {
 +                    serialize(msg, out, messagingVersion, session);
 +                }
 +            }
 +            catch (Exception e)
 +            {
 +                session.onError(e);
 +            }
 +            catch (Throwable t)
 +            {
 +                if (closed && getRootCause(t) instanceof ClosedByInterruptException && fileTransferExecutor.isShutdown())
 +                {
 +                    logger.debug("{} Streaming channel was closed due to the executor pool being shutdown", createLogTag(session, channel));
 +                }
 +                else
 +                {
 +                    inspectThrowable(t);
 +                    if (!session.state().isFinalState())
 +                        session.onError(t);
 +                }
 +            }
 +            finally
 +            {
 +                fileTransferSemaphore.release(1);
 +            }
 +        }
 +
 +        boolean acquirePermit(int logInterval)
 +        {
 +            long logIntervalNanos = MINUTES.toNanos(logInterval);
 +            long timeOfLastLogging = nanoTime();
 +            while (true)
 +            {
 +                if (closed)
 +                    return false;
 +                try
 +                {
 +                    if (fileTransferSemaphore.tryAcquire(1, 1, SECONDS))
 +                        return true;
 +
 +                    // log a helpful message to operators in case they are wondering why a given session might not be making progress.
 +                    long now = nanoTime();
 +                    if (now - timeOfLastLogging > logIntervalNanos)
 +                    {
 +                        timeOfLastLogging = now;
 +                        OutgoingStreamMessage ofm = (OutgoingStreamMessage)msg;
 +
 +                        if (logger.isInfoEnabled())
 +                            logger.info("{} waiting to acquire a permit to begin streaming {}. This message logs every {} minutes",
 +                                        createLogTag(session), ofm.getName(), logInterval);
 +                    }
 +                }
 +                catch (InterruptedException e)
 +                {
 +                    throw new UncheckedInterruptedException(e);
 +                }
 +            }
 +        }
 +
-         private StreamingChannel getOrCreateChannel()
++        private StreamingChannel getOrCreateFileChannel(InetAddressAndPort connectTo)
 +        {
 +            Thread currentThread = currentThread();
 +            try
 +            {
 +                StreamingChannel channel = threadToChannelMap.get(currentThread);
 +                if (channel != null)
 +                    return channel;
 +
-                 channel = createChannel(StreamingChannel.Kind.FILE);
++                channel = createFileChannel(connectTo);
 +                threadToChannelMap.put(currentThread, channel);
 +                return channel;
 +            }
 +            catch (Exception e)
 +            {
 +                throw new IOError(e);
 +            }
 +        }
 +
 +        /**
 +         * For testing purposes
 +         */
 +        void injectChannel(StreamingChannel channel)
 +        {
 +            Thread currentThread = currentThread();
 +            if (threadToChannelMap.get(currentThread) != null)
 +                throw new IllegalStateException("previous channel already set");
 +
 +            threadToChannelMap.put(currentThread, channel);
 +        }
 +
 +        /**
 +         * For testing purposes
 +         */
 +        void unsetChannel()
 +        {
 +            threadToChannelMap.remove(currentThread());
 +        }
 +    }
 +
 +    /**
 +     * For testing purposes only.
 +     */
 +    public void setClosed()
 +    {
 +        closed = true;
 +    }
 +
 +    void setControlChannel(NettyStreamingChannel channel)
 +    {
 +        controlChannel = channel;
 +    }
 +
 +    int semaphoreAvailablePermits()
 +    {
 +        return fileTransferSemaphore.permits();
 +    }
 +
 +    public boolean connected()
 +    {
 +        return !closed && (controlChannel == null || controlChannel.connected());
 +    }
 +
 +    public void close()
 +    {
 +        if (closed)
 +            return;
 +
 +        closed = true;
 +        if (logger.isDebugEnabled())
 +            logger.debug("{} Closing stream connection channels on {}", createLogTag(session), to);
 +        for (ScheduledFuture<?> future : channelKeepAlives)
 +            future.cancel(false);
 +        channelKeepAlives.clear();
 +
 +        threadToChannelMap.values().forEach(StreamingChannel::close);
 +        threadToChannelMap.clear();
 +        fileTransferExecutor.shutdownNow();
 +    }
 +}
diff --cc src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
index 354ed941c9,7db2aa68b3..eb34cef1ef
--- a/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
@@@ -43,12 -40,12 +43,29 @@@ public class BulkLoadConnectionFactory 
          this.outboundBindAny = outboundBindAny;
      }
  
 -    public Channel createConnection(OutboundConnectionSettings template, int messagingVersion) throws IOException
++    @Override
 +    public NettyStreamingChannel create(InetSocketAddress to, int messagingVersion, StreamingChannel.Kind kind) throws IOException
++    {
++        OutboundConnectionSettings template = new OutboundConnectionSettings(getByAddress(to));
++        return create(template, messagingVersion, kind);
++    }
++
++    @Override
++    public StreamingChannel create(InetSocketAddress to,
++                                   InetSocketAddress preferred,
++                                   int messagingVersion,
++                                   StreamingChannel.Kind kind) throws IOException
++    {
++        // Supply a preferred address to the template, which will be overwritten if encryption is configured.
++        OutboundConnectionSettings template = new OutboundConnectionSettings(getByAddress(to), getByAddress(preferred));
++        return create(template, messagingVersion, kind);
++    }
++
++    private NettyStreamingChannel create(OutboundConnectionSettings template, int messagingVersion, StreamingChannel.Kind kind) throws IOException
      {
          // Connect to secure port for all peers if ServerEncryptionOptions is configured other than 'none'
          // When 'all', 'dc' and 'rack', server nodes always have SSL port open, and since thin client like sstableloader
--        // does not know which node is in which dc/rack, connecting to SSL port is always the option.
-         OutboundConnectionSettings template = new OutboundConnectionSettings(getByAddress(to));
 -
++        // does not know which node is in which dc/rack, connecting to SSL port is always the option. 
          if (encryptionOptions != null && encryptionOptions.internode_encryption != EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.none)
              template = template.withConnectTo(template.to.withPort(secureStoragePort)).withEncryption(encryptionOptions);
  
diff --cc test/distributed/org/apache/cassandra/distributed/test/RepairErrorsTest.java
index c3c2b1427d,37c9171542..74a5c6f51d
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairErrorsTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairErrorsTest.java
@@@ -27,25 -27,24 +27,30 @@@ import net.bytebuddy.ByteBuddy
  import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
  import net.bytebuddy.implementation.MethodDelegation;
  import net.bytebuddy.implementation.bind.annotation.SuperCall;
 +import org.assertj.core.api.Assertions;
  import org.junit.Test;
  
 -import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.db.compaction.CompactionInterruptedException;
 +import org.apache.cassandra.db.compaction.CompactionIterator;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+ import org.apache.cassandra.db.streaming.CassandraIncomingFile;
  import org.apache.cassandra.distributed.Cluster;
  import org.apache.cassandra.distributed.api.ConsistencyLevel;
+ import org.apache.cassandra.distributed.api.IInvokableInstance;
  import org.apache.cassandra.distributed.api.NodeToolResult;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.io.util.DataInputPlus;
  import org.apache.cassandra.locator.InetAddressAndPort;
 -import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.locator.RangesAtEndpoint;
  import org.apache.cassandra.streaming.StreamSession;
 +import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.utils.TimeUUID;
  
  import static net.bytebuddy.matcher.ElementMatchers.named;
 -import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertTrue;
  
@@@ -54,34 -53,6 +59,33 @@@ import static org.apache.cassandra.dist
  
  public class RepairErrorsTest extends TestBaseImpl
  {
 +    @Test
 +    public void testRemoteValidationFailure() throws IOException
 +    {
 +        Cluster.Builder builder = Cluster.build(2)
 +                                         .withConfig(config -> config.with(GOSSIP).with(NETWORK))
 +                                         .withInstanceInitializer(ByteBuddyHelper::install);
 +        try (Cluster cluster = builder.createWithoutStarting())
 +        {
 +            cluster.setUncaughtExceptionsFilter((i, throwable) -> {
 +                if (i == 2)
 +                    return throwable.getMessage() != null && throwable.getMessage().contains("IGNORE");
 +                return false;
 +            });
 +
 +            cluster.startup();
 +            init(cluster);
 +
 +            cluster.schemaChange("create table "+KEYSPACE+".tbl (id int primary key, x int)");
 +            for (int i = 0; i < 10; i++)
 +                cluster.coordinator(1).execute("insert into "+KEYSPACE+".tbl (id, x) VALUES (?,?)", ConsistencyLevel.ALL, i, i);
 +            cluster.forEach(i -> i.flush(KEYSPACE));
 +            long mark = cluster.get(1).logs().mark();
 +            cluster.forEach(i -> i.nodetoolResult("repair", "--full").asserts().failure());
 +            Assertions.assertThat(cluster.get(1).logs().grep(mark, "^ERROR").getResult()).isEmpty();
 +        }
 +    }
 +
-     @SuppressWarnings("Convert2MethodRef")
      @Test
      public void testRemoteSyncFailure() throws Exception
      {
@@@ -124,62 -95,65 +128,111 @@@
              result = cluster.get(1).nodetoolResult("repair", KEYSPACE);
              result.asserts().success();
  
-             // Make sure we've cleaned up sessions and parent sessions:
-             Integer parents = cluster.get(1).callOnInstance(() -> ActiveRepairService.instance.parentRepairSessionCount());
-             assertEquals(0, parents.intValue());
-             Integer sessions = cluster.get(1).callOnInstance(() -> ActiveRepairService.instance.sessionCount());
-             assertEquals(0, sessions.intValue());
+             assertNoActiveRepairSessions(cluster.get(1));
 +
 +            cluster.forEach(i -> Assertions.assertThat(i.logs().grep("SomeRepairFailedException").getResult())
 +                                           .describedAs("node%d logged hidden exception org.apache.cassandra.repair.SomeRepairFailedException", i.config().num())
 +                                           .isEmpty());
          }
      }
  
+     @Test
+     public void testRemoteStreamFailure() throws Exception
+     {
+         try (Cluster cluster = init(Cluster.build(3)
+                                            .withConfig(config -> config.with(GOSSIP, NETWORK)
+                                                                        .set("disk_failure_policy", "stop")
+                                                                        .set("disk_access_mode", "mmap_index_only"))
+                                            .withInstanceInitializer(ByteBuddyHelperStreamFailure::installStreamHandlingFailure).start()))
+         {
+             // Make sure we don't auto-compact the peers table. We'll need to try it manually later.
+             cluster.get(1).runOnInstance(() -> {
+                 ColumnFamilyStore cfs = Keyspace.open("system").getColumnFamilyStore("peers_v2");
+                 cfs.disableAutoCompaction();
+             });
+ 
+             cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, x int)");
+ 
+             // On repair, this data layout will require two (local) syncs from node 1 and one remote sync from node 2:
+             cluster.get(1).executeInternal("insert into " + KEYSPACE + ".tbl (id, x) VALUES (?,?)", 1, 1);
+             cluster.get(2).executeInternal("insert into " + KEYSPACE + ".tbl (id, x) VALUES (?,?)", 2, 2);
+             cluster.get(3).executeInternal("insert into " + KEYSPACE + ".tbl (id, x) VALUES (?,?)", 3, 3);
+             cluster.forEach(i -> i.flush(KEYSPACE));
+ 
+             // Flush system.peers_v2, or there won't be any SSTables...
+             cluster.forEach(i -> i.flush("system"));
+ 
+             // Stream reading will fail on node 3, and this will interrupt node 1 just as it starts to stream to node 2.
+             NodeToolResult result = cluster.get(1).nodetoolResult("repair", KEYSPACE);
+             result.asserts().failure();
+ 
+             // Ensure that the peers table is compactable even after the file streaming task is interrupted.
+             cluster.get(1).runOnInstance(() -> {
+                 ColumnFamilyStore cfs = Keyspace.open("system").getColumnFamilyStore("peers_v2");
+                 cfs.forceMajorCompaction();
+             });
+ 
+             assertTrue(cluster.get(1).logs().grep("Stopping transports as disk_failure_policy is stop").getResult().isEmpty());
+             assertTrue(cluster.get(1).logs().grep("FSReadError").getResult().isEmpty());
+ 
+             assertNoActiveRepairSessions(cluster.get(1));
+         }
+     }
+ 
 +    @Test
 +    public void testNoSuchRepairSessionAnticompaction() throws IOException
 +    {
 +        try (Cluster cluster = init(Cluster.build(2)
 +                                           .withConfig(config -> config.with(GOSSIP).with(NETWORK))
 +                                           .withInstanceInitializer(ByteBuddyHelper::installACNoSuchRepairSession)
 +                                           .start()))
 +        {
 +            cluster.schemaChange("create table "+KEYSPACE+".tbl (id int primary key, x int)");
 +            for (int i = 0; i < 10; i++)
 +                cluster.coordinator(1).execute("insert into "+KEYSPACE+".tbl (id, x) VALUES (?,?)", ConsistencyLevel.ALL, i, i);
 +            cluster.forEach(i -> i.flush(KEYSPACE));
 +            long mark = cluster.get(1).logs().mark();
 +            cluster.forEach(i -> i.nodetoolResult("repair", KEYSPACE).asserts().failure());
 +            assertTrue(cluster.get(1).logs().grep(mark, "^ERROR").getResult().isEmpty());
 +        }
 +    }
 +
+     @SuppressWarnings("Convert2MethodRef")
+     private void assertNoActiveRepairSessions(IInvokableInstance instance)
+     {
+         // Make sure we've cleaned up sessions and parent sessions:
+         Integer parents = instance.callOnInstance(() -> ActiveRepairService.instance.parentRepairSessionCount());
+         assertEquals(0, parents.intValue());
+         Integer sessions = instance.callOnInstance(() -> ActiveRepairService.instance.sessionCount());
+         assertEquals(0, sessions.intValue());
+     }
+ 
      public static class ByteBuddyHelper
      {
 +        public static void install(ClassLoader cl, int nodeNumber)
 +        {
 +            if (nodeNumber == 2)
 +            {
 +                new ByteBuddy().redefine(CompactionIterator.class)
 +                               .method(named("next"))
 +                               .intercept(MethodDelegation.to(ByteBuddyHelper.class))
 +                               .make()
 +                               .load(cl, ClassLoadingStrategy.Default.INJECTION);
 +            }
 +        }
 +
 +        public static void installACNoSuchRepairSession(ClassLoader cl, int nodeNumber)
 +        {
 +            if (nodeNumber == 2)
 +            {
 +                new ByteBuddy().redefine(CompactionManager.class)
 +                               .method(named("validateSSTableBoundsForAnticompaction"))
 +                               .intercept(MethodDelegation.to(ByteBuddyHelper.class))
 +                               .make()
 +                               .load(cl, ClassLoadingStrategy.Default.INJECTION);
 +            }
 +        }
 +        
          public static void installStreamPlanExecutionFailure(ClassLoader cl, int nodeNumber)
          {
              if (nodeNumber == 2)
@@@ -238,5 -205,64 +291,54 @@@
  
              return zuper.call();
          }
 -
 -        @SuppressWarnings({"unused", "ResultOfMethodCallIgnored"})
 -        public static Throwable extractThrowable(Future<?> future, @SuperCall Callable<Throwable> zuper) throws Exception
 -        {
 -            if (Thread.currentThread().getName().contains("RepairJobTask"))
 -                // Clear the interrupt flag so the FSReadError is propagated correctly in DebuggableThreadPoolExecutor:
 -                Thread.interrupted();
 -            
 -            return zuper.call();
 -        }
      }
+ 
+     public static class ByteBuddyHelperStreamFailure
+     {
+         public static void installStreamHandlingFailure(ClassLoader cl, int nodeNumber)
+         {
+             if (nodeNumber == 3)
+             {
+                 new ByteBuddy().rebase(CassandraIncomingFile.class)
 -                        .method(named("read"))
 -                        .intercept(MethodDelegation.to(ByteBuddyHelperStreamFailure.class))
 -                        .make()
 -                        .load(cl, ClassLoadingStrategy.Default.INJECTION);
++                               .method(named("read"))
++                               .intercept(MethodDelegation.to(ByteBuddyHelperStreamFailure.class))
++                               .make()
++                               .load(cl, ClassLoadingStrategy.Default.INJECTION);
+             }
+ 
+             if (nodeNumber == 1)
+             {
+                 new ByteBuddy().rebase(SystemKeyspace.class)
 -                        .method(named("getPreferredIP"))
 -                        .intercept(MethodDelegation.to(ByteBuddyHelperStreamFailure.class))
 -                        .make()
 -                        .load(cl, ClassLoadingStrategy.Default.INJECTION);
++                               .method(named("getPreferredIP"))
++                               .intercept(MethodDelegation.to(ByteBuddyHelperStreamFailure.class))
++                               .make()
++                               .load(cl, ClassLoadingStrategy.Default.INJECTION);
+             }
+         }
+ 
+         @SuppressWarnings("unused")
+         public static void read(DataInputPlus in, int version) throws IOException
+         {
+             throw new IOException("Failing incoming file read from test!");
+         }
+ 
+         @SuppressWarnings("unused")
+         public static InetAddressAndPort getPreferredIP(InetAddressAndPort ep, @SuperCall Callable<InetAddressAndPort> zuper) throws Exception
+         {
 -            if (Thread.currentThread().getName().contains("NettyStreaming-Outbound") && ep.address.toString().contains("127.0.0.2"))
++            if (Thread.currentThread().getName().contains("NettyStreaming-Outbound") && ep.getAddress().toString().contains("127.0.0.2"))
+             {
+                 try
+                 {
+                     TimeUnit.SECONDS.sleep(10);
+                 }
+                 catch (InterruptedException e)
+                 {
+                     // Leave the interrupt flag intact for the ChannelProxy downstream...
+                     Thread.currentThread().interrupt();
+                 }
+             }
+ 
+             return zuper.call();
+         }
+     }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org