You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2022/02/04 19:37:10 UTC

[cassandra] branch trunk updated: When streaming sees a ClosedChannelException this triggers the disk failure policy

This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new df16b37  When streaming sees a ClosedChannelException this triggers the disk failure policy
df16b37 is described below

commit df16b3750dc2c1b6b9bcdece6f81dfd3de7ebdfa
Author: David Capwell <dc...@apache.org>
AuthorDate: Fri Feb 4 10:15:58 2022 -0800

    When streaming sees a ClosedChannelException this triggers the disk failure policy
    
    patch by David Capwell, Francisco Guerrero; reviewed by Caleb Rackliffe, Dinesh Joshi for CASSANDRA-17116
---
 CHANGES.txt                                        |   1 +
 .../streaming/CassandraCompressedStreamReader.java |  13 +-
 .../CassandraEntireSSTableStreamReader.java        |  10 +-
 .../db/streaming/CassandraIncomingFile.java        |   4 +-
 .../db/streaming/CassandraStreamReader.java        |  40 +++--
 .../cassandra/db/streaming/ComponentManifest.java  |   8 +
 .../cassandra/db/streaming/IStreamReader.java      |   4 +-
 .../org/apache/cassandra/io/FSErrorHandler.java    |   1 +
 .../sstable/format/big/BigTableZeroCopyWriter.java |  12 +-
 .../org/apache/cassandra/io/util/FileUtils.java    |   5 +
 .../apache/cassandra/metrics/StreamingMetrics.java |  10 ++
 .../cassandra/service/DefaultFSErrorHandler.java   |   3 +-
 .../cassandra/service/NativeTransportService.java  |   3 +-
 .../apache/cassandra/service/StorageService.java   |   4 +-
 .../apache/cassandra/streaming/IncomingStream.java |   4 +-
 .../streaming/StreamDeserializingTask.java         |   8 +-
 .../streaming/StreamReceiveException.java          |   4 +-
 .../cassandra/streaming/StreamResultFuture.java    |   2 +-
 .../apache/cassandra/streaming/StreamSession.java  |  70 +++++---
 .../streaming/messages/IncomingStreamMessage.java  |   5 +-
 .../cassandra/utils/JVMStabilityInspector.java     |  15 +-
 .../distributed/impl/AbstractCluster.java          |  16 +-
 .../cassandra/distributed/impl/Instance.java       |   2 +-
 .../cassandra/distributed/impl/InstanceKiller.java |   9 +
 .../distributed/test/FailingRepairTest.java        |  20 ++-
 .../cassandra/distributed/test/StreamingTest.java  |   5 +-
 .../test/streaming/StreamCloseInMiddleTest.java    | 196 +++++++++++++++++++++
 .../distributed/upgrade/MixedModeGossipTest.java   |   8 +-
 .../upgrade/MixedModeMessageForwardTest.java       |   2 +-
 .../distributed/upgrade/MixedModeReadTest.java     |   4 +-
 .../distributed/upgrade/MixedModeRepairTest.java   |   2 +-
 .../upgrade/Pre40MessageFilterTest.java            |   2 +-
 .../distributed/upgrade/UpgradeTestBase.java       |  12 +-
 .../microbench/ZeroCopyStreamingBenchmark.java     |   4 +-
 .../CassandraEntireSSTableStreamWriterTest.java    |   2 +-
 ...TableStreamConcurrentComponentMutationTest.java |   8 +-
 .../format/big/BigTableZeroCopyWriterTest.java     |  11 +-
 .../cassandra/utils/JVMStabilityInspectorTest.java |  82 ++++++---
 38 files changed, 461 insertions(+), 150 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index fe3fb22..a16f5f3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * When streaming sees a ClosedChannelException this triggers the disk failure policy (CASSANDRA-17116)
  * Add a virtual table for exposing prepared statements metrics (CASSANDRA-17224)
  * Remove python 2.x support from cqlsh (CASSANDRA-17242)
  * Prewarm role and credential caches to avoid timeouts at startup (CASSANDRA-16958)
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
index cfa9018..dda874b 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db.streaming;
 
 import java.io.IOException;
 
-import com.google.common.base.Throwables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,8 +33,6 @@ import org.apache.cassandra.streaming.messages.StreamMessageHeader;
 import org.apache.cassandra.utils.ChecksumType;
 import org.apache.cassandra.utils.FBUtilities;
 
-import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause;
-
 /**
  * CassandraStreamReader that reads from streamed compressed SSTable
  */
@@ -57,7 +54,7 @@ public class CassandraCompressedStreamReader extends CassandraStreamReader
      */
     @Override
     @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed
-    public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException
+    public SSTableMultiWriter read(DataInputPlus inputPlus) throws Throwable
     {
         long totalSize = totalSize();
 
@@ -110,12 +107,8 @@ public class CassandraCompressedStreamReader extends CassandraStreamReader
             logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
                         session.planId(), partitionKey, cfs.keyspace.getName(), cfs.getTableName());
             if (writer != null)
-            {
-                writer.abort(e);
-            }
-            if (extractIOExceptionCause(e).isPresent())
-                throw e;
-            throw Throwables.propagate(e);
+                e = writer.abort(e);
+            throw e;
         }
     }
 
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
index e547e0f..261c59e 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
@@ -22,14 +22,12 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.function.UnaryOperator;
 
-import com.google.common.base.Throwables;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
-import org.apache.cassandra.io.util.File;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
@@ -37,6 +35,7 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.big.BigTableZeroCopyWriter;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.StreamReceiver;
@@ -85,7 +84,7 @@ public class CassandraEntireSSTableStreamReader implements IStreamReader
      */
     @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed
     @Override
-    public SSTableMultiWriter read(DataInputPlus in) throws IOException
+    public SSTableMultiWriter read(DataInputPlus in) throws Throwable
     {
         ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
         if (cfs == null)
@@ -147,8 +146,7 @@ public class CassandraEntireSSTableStreamReader implements IStreamReader
             logger.error("[Stream {}] Error while reading sstable from stream for table = {}", session.planId(), cfs.metadata(), e);
             if (writer != null)
                 e = writer.abort(e);
-            Throwables.throwIfUnchecked(e);
-            throw new RuntimeException(e);
+            throw e;
         }
     }
 
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
index 11a18a0..74946a8 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.db.streaming;
 
-import java.io.IOException;
 import java.util.Objects;
 
 import com.google.common.base.Preconditions;
@@ -64,10 +63,11 @@ public class CassandraIncomingFile implements IncomingStream
     }
 
     @Override
-    public synchronized void read(DataInputPlus in, int version) throws IOException
+    public synchronized void read(DataInputPlus in, int version) throws Throwable
     {
         CassandraStreamHeader streamHeader = CassandraStreamHeader.serializer.deserialize(in, version);
         logger.debug("Incoming stream entireSSTable={} components={}", streamHeader.isEntireSSTable, streamHeader.componentManifest);
+        session.countStreamedIn(streamHeader.isEntireSSTable);
 
         IStreamReader reader;
         if (streamHeader.isEntireSSTable)
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
index 6835fad..7d39e83 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
@@ -17,31 +17,39 @@
  */
 package org.apache.cassandra.db.streaming;
 
-import java.io.*;
+import java.io.IOError;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.UUID;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
 import com.google.common.collect.UnmodifiableIterator;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.exceptions.UnknownColumnException;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.TrackedDataInputPlus;
-import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.RegularAndStaticColumns;
+import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
-import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.rows.DeserializationHelper;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.exceptions.UnknownColumnException;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
 import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.TrackedDataInputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.StreamReceiver;
 import org.apache.cassandra.streaming.StreamSession;
@@ -98,16 +106,14 @@ public class CassandraStreamReader implements IStreamReader
      */
     @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed
     @Override
-    public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException
+    public SSTableMultiWriter read(DataInputPlus inputPlus) throws Throwable
     {
         long totalSize = totalSize();
 
         ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
         if (cfs == null)
-        {
             // schema was dropped during streaming
-            throw new IOException("CF " + tableId + " was dropped during streaming");
-        }
+            throw new IllegalStateException("Table " + tableId + " was dropped during streaming");
 
         logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}', pendingRepair = '{}'.",
                      session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
@@ -136,10 +142,8 @@ public class CassandraStreamReader implements IStreamReader
             logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
                         session.planId(), partitionKey, cfs.keyspace.getName(), cfs.getTableName(), e);
             if (writer != null)
-            {
-                writer.abort(e);
-            }
-            throw Throwables.propagate(e);
+                e = writer.abort(e);
+            throw e;
         }
     }
 
diff --git a/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java b/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
index 71aa0f8..b77b594 100644
--- a/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
+++ b/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
@@ -106,6 +106,14 @@ public final class ComponentManifest implements Iterable<Component>
         return components.hashCode();
     }
 
+    @Override
+    public String toString()
+    {
+        return "ComponentManifest{" +
+               "components=" + components +
+               '}';
+    }
+
     public static final IVersionedSerializer<ComponentManifest> serializer = new IVersionedSerializer<ComponentManifest>()
     {
         public void serialize(ComponentManifest manifest, DataOutputPlus out, int version) throws IOException
diff --git a/src/java/org/apache/cassandra/db/streaming/IStreamReader.java b/src/java/org/apache/cassandra/db/streaming/IStreamReader.java
index cf93bc2..e7cb2a2 100644
--- a/src/java/org/apache/cassandra/db/streaming/IStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/IStreamReader.java
@@ -18,8 +18,6 @@
 
 package org.apache.cassandra.db.streaming;
 
-import java.io.IOException;
-
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.util.DataInputPlus;
 
@@ -28,5 +26,5 @@ import org.apache.cassandra.io.util.DataInputPlus;
  */
 public interface IStreamReader
 {
-    public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException;
+    SSTableMultiWriter read(DataInputPlus inputPlus) throws Throwable;
 }
diff --git a/src/java/org/apache/cassandra/io/FSErrorHandler.java b/src/java/org/apache/cassandra/io/FSErrorHandler.java
index 081ec0b..b7d2836 100644
--- a/src/java/org/apache/cassandra/io/FSErrorHandler.java
+++ b/src/java/org/apache/cassandra/io/FSErrorHandler.java
@@ -27,4 +27,5 @@ public interface FSErrorHandler
 {
     void handleCorruptSSTable(CorruptSSTableException e);
     void handleFSError(FSError e);
+    default void handleStartupFSError(Throwable t) {}
 }
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java
index 717e9d9..c47c23b 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.io.sstable.format.big;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
 import java.util.Collection;
 import java.util.EnumMap;
 import java.util.Map;
@@ -198,7 +199,7 @@ public class BigTableZeroCopyWriter extends SSTable implements SSTableMultiWrite
             writer.close();
     }
 
-    public void writeComponent(Component.Type type, DataInputPlus in, long size)
+    public void writeComponent(Component.Type type, DataInputPlus in, long size) throws ClosedChannelException
     {
         logger.info("Writing component {} to {} length {}", type, componentWriters.get(type).getPath(), prettyPrintMemory(size));
 
@@ -208,7 +209,7 @@ public class BigTableZeroCopyWriter extends SSTable implements SSTableMultiWrite
             write(in, size, componentWriters.get(type));
     }
 
-    private void write(AsyncStreamingInputPlus in, long size, SequentialWriter writer)
+    private void write(AsyncStreamingInputPlus in, long size, SequentialWriter writer) throws ClosedChannelException
     {
         logger.info("Block Writing component to {} length {}", writer.getPath(), prettyPrintMemory(size));
 
@@ -222,6 +223,13 @@ public class BigTableZeroCopyWriter extends SSTable implements SSTableMultiWrite
         {
             in.close();
         }
+        catch (ClosedChannelException e)
+        {
+            // FSWriteError triggers disk failure policy, but if we get a connection issue we do not want to do that
+            // so rethrow so the error handling logic higher up is able to deal with this
+            // see CASSANDRA-17116
+            throw e;
+        }
         catch (IOException e)
         {
             throw new FSWriteError(e, writer.getPath());
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 45abd7b..ea54497 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -463,6 +463,11 @@ public final class FileUtils
         fsErrorHandler.get().ifPresent(handler -> handler.handleFSError(e));
     }
 
+    public static void handleStartupFSError(Throwable t)
+    {
+        fsErrorHandler.get().ifPresent(handler -> handler.handleStartupFSError(t));
+    }
+
     /**
      * handleFSErrorAndPropagate will invoke the disk failure policy error handler,
      * which may or may not stop the daemon or transports. However, if we don't exit,
diff --git a/src/java/org/apache/cassandra/metrics/StreamingMetrics.java b/src/java/org/apache/cassandra/metrics/StreamingMetrics.java
index 54df233..e38b605 100644
--- a/src/java/org/apache/cassandra/metrics/StreamingMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/StreamingMetrics.java
@@ -47,6 +47,8 @@ public class StreamingMetrics
     public final Counter outgoingBytes;
     /* Measures the time taken for processing the incoming stream message after being deserialized, including the time to flush to disk. */
     public final Timer incomingProcessTime;
+    private final Counter entireSSTablesStreamedIn;
+    private final Counter partialSSTablesStreamedIn;
 
     public static StreamingMetrics get(InetAddressAndPort ip)
     {
@@ -79,5 +81,13 @@ public class StreamingMetrics
         incomingBytes = Metrics.counter(factory.createMetricName("IncomingBytes"));
         outgoingBytes= Metrics.counter(factory.createMetricName("OutgoingBytes"));
         incomingProcessTime = Metrics.timer(factory.createMetricName("IncomingProcessTime"));
+
+        entireSSTablesStreamedIn = Metrics.counter(factory.createMetricName("EntireSSTablesStreamedIn"));
+        partialSSTablesStreamedIn = Metrics.counter(factory.createMetricName("PartialSSTablesStreamedIn"));
+    }
+
+    public void countStreamedIn(boolean isEntireSSTable)
+    {
+        (isEntireSSTable ? entireSSTablesStreamedIn : partialSSTablesStreamedIn).inc();
     }
 }
diff --git a/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java b/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java
index 04cb11c..6a9e15e 100644
--- a/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java
+++ b/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java
@@ -94,7 +94,8 @@ public class DefaultFSErrorHandler implements FSErrorHandler
         }
     }
 
-    private static void handleStartupFSError(Throwable t)
+    @Override
+    public void handleStartupFSError(Throwable t)
     {
         switch (DatabaseDescriptor.getDiskFailurePolicy())
         {
diff --git a/src/java/org/apache/cassandra/service/NativeTransportService.java b/src/java/org/apache/cassandra/service/NativeTransportService.java
index 7556f81..f131d74 100644
--- a/src/java/org/apache/cassandra/service/NativeTransportService.java
+++ b/src/java/org/apache/cassandra/service/NativeTransportService.java
@@ -149,7 +149,8 @@ public class NativeTransportService
         servers = Collections.emptyList();
 
         // shutdown executors used by netty for native transport server
-        workerGroup.shutdownGracefully(3, 5, TimeUnit.SECONDS).awaitUninterruptibly();
+        if (workerGroup != null)
+            workerGroup.shutdownGracefully(3, 5, TimeUnit.SECONDS).awaitUninterruptibly();
 
         Dispatcher.shutdown();
     }
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index fcff144..d5b676f 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -521,9 +521,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public boolean isDaemonSetupCompleted()
     {
-        return daemon == null
-               ? false
-               : daemon.setupCompleted();
+        return daemon != null && daemon.setupCompleted();
     }
 
     public void stopDaemon()
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStream.java b/src/java/org/apache/cassandra/streaming/IncomingStream.java
index 0733249..25ab626 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStream.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStream.java
@@ -18,8 +18,6 @@
 
 package org.apache.cassandra.streaming;
 
-import java.io.IOException;
-
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.schema.TableId;
 
@@ -37,7 +35,7 @@ public interface IncomingStream
     /**
      * Read in the stream data.
      */
-    void read(DataInputPlus inputPlus, int version) throws IOException;
+    void read(DataInputPlus inputPlus, int version) throws Throwable;
 
     String getName();
     long getSize();
diff --git a/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java b/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java
index dd2678c..3785249 100644
--- a/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java
@@ -18,10 +18,7 @@
 
 package org.apache.cassandra.streaming;
 
-import java.nio.channels.ClosedChannelException;
-
 import com.google.common.annotations.VisibleForTesting;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,9 +76,6 @@ public class StreamDeserializingTask implements Runnable
                 session.messageReceived(message);
             }
         }
-        catch (ClosedChannelException ignore)
-        {
-        }
         catch (Throwable t)
         {
             JVMStabilityInspector.inspectThrowable(t);
@@ -91,7 +85,7 @@ public class StreamDeserializingTask implements Runnable
             }
             else if (t instanceof StreamReceiveException)
             {
-                ((StreamReceiveException)t).session.onError(t);
+                ((StreamReceiveException)t).session.onError(t.getCause());
             }
             else
             {
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveException.java b/src/java/org/apache/cassandra/streaming/StreamReceiveException.java
index 54b365a..c564182 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveException.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveException.java
@@ -18,7 +18,9 @@
 
 package org.apache.cassandra.streaming;
 
-public class StreamReceiveException extends RuntimeException
+import java.io.IOException;
+
+public class StreamReceiveException extends IOException
 {
     public final StreamSession session;
 
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index f7a0b63..66e99be 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -191,7 +191,7 @@ public final class StreamResultFuture extends AsyncFuture<StreamState>
 
     void handleSessionComplete(StreamSession session)
     {
-        logger.info("[Stream #{}] Session with {} is complete", session.planId(), session.peer);
+        logger.info("[Stream #{}] Session with {} is {}", session.planId(), session.peer, session.state().name().toLowerCase());
         fireStreamEvent(new StreamEvent.SessionCompleteEvent(session));
         SessionInfo sessionInfo = session.getSessionInfo();
         coordinator.addSessionInfo(sessionInfo);
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index bcf19d5..fd24539 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.streaming;
 
 import java.io.EOFException;
 import java.net.SocketTimeoutException;
+import java.nio.channels.ClosedChannelException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -55,7 +56,6 @@ import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.NoSpamLogger;
 
 import static com.google.common.collect.Iterables.all;
-import static org.apache.cassandra.net.MessagingService.current_version;
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 import static org.apache.cassandra.locator.InetAddressAndPort.hostAddressAndPort;
 import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
@@ -364,7 +364,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
                                                               getPendingRepair(),
                                                               getPreviewKind());
 
-            channel.sendControlMessage(message);
+            channel.sendControlMessage(message).sync();
             onInitializationComplete();
         }
         catch (Exception e)
@@ -535,8 +535,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
      */
     public void state(State newState)
     {
-        if (logger.isTraceEnabled())
-            logger.trace("[Stream #{}] Changing session state from {} to {}", planId(), state, newState);
+        if (logger.isDebugEnabled())
+            logger.debug("[Stream #{}] Changing session state from {} to {}", planId(), state, newState);
 
         sink.recordState(peer, newState);
         state = newState;
@@ -626,7 +626,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
             prepare.summaries.add(task.getSummary());
         }
 
-        channel.sendControlMessage(prepare);
+        channel.sendControlMessage(prepare).syncUninterruptibly();
     }
 
     /**
@@ -636,9 +636,10 @@ public class StreamSession implements IEndpointStateChangeSubscriber
      */
     public synchronized Future onError(Throwable e)
     {
-        boolean isEofException = e instanceof EOFException;
+        boolean isEofException = e instanceof EOFException || e instanceof ClosedChannelException;
         if (isEofException)
         {
+            State state = this.state;
             if (state.finalState)
             {
                 logger.debug("[Stream #{}] Socket closed after session completed with state {}", planId(), state);
@@ -659,7 +660,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         logError(e);
         // send session failure message
         if (channel.connected())
-            channel.sendControlMessage(new SessionFailedMessage());
+            channel.sendControlMessage(new SessionFailedMessage()).syncUninterruptibly();
         // fail session
         return closeSession(State.FAILED);
     }
@@ -703,6 +704,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         });
     }
 
+    public void countStreamedIn(boolean isEntireSSTable)
+    {
+        metrics.countStreamedIn(isEntireSSTable);
+    }
+
     /**
      * Finish preparing the session. This method is blocking (memtables are flushed in {@link #addTransferRanges}),
      * so the logic should not execute on the main IO thread (read: netty event loop).
@@ -718,9 +724,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         if (!peer.equals(FBUtilities.getBroadcastAddressAndPort()))
             for (StreamTransferTask task : transfers.values())
                 prepareSynAck.summaries.add(task.getSummary());
-        channel.sendControlMessage(prepareSynAck);
 
         streamResult.handleSessionPrepared(this);
+        // After sending the message the initiator can close the channel which will cause a ClosedChannelException
+        // in buffer logic, this then gets sent to onError which validates the state isFinalState, if not fails
+        // the session.  To avoid a race condition between sending and setting state, make sure to update the state
+        // before sending the message (without closing the channel)
+        // see CASSANDRA-17116
+        if (isPreview())
+            state(State.COMPLETE);
+        channel.sendControlMessage(prepareSynAck).syncUninterruptibly();
 
         if (isPreview())
             completePreview();
@@ -737,7 +750,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
 
             // only send the (final) ACK if we are expecting the peer to send this node (the initiator) some files
             if (!isPreview())
-                channel.sendControlMessage(new PrepareAckMessage());
+                channel.sendControlMessage(new PrepareAckMessage()).syncUninterruptibly();
         }
 
         if (isPreview())
@@ -794,7 +807,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         StreamingMetrics.totalIncomingBytes.inc(headerSize);
         metrics.incomingBytes.inc(headerSize);
         // send back file received message
-        channel.sendControlMessage(new ReceivedMessage(message.header.tableId, message.header.sequenceNumber));
+        channel.sendControlMessage(new ReceivedMessage(message.header.tableId, message.header.sequenceNumber)).syncUninterruptibly();
         StreamHook.instance.reportIncomingStream(message.header.tableId, message.stream, this, message.header.sequenceNumber);
         long receivedStartNanos = nanoTime();
         try
@@ -837,14 +850,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     {
         logger.debug("[Stream #{}] handling Complete message, state = {}", planId(), state);
 
-        if (!isFollower)
+        if (!isFollower) // initiator
         {
-            if (state == State.WAIT_COMPLETE)
-                closeSession(State.COMPLETE);
-            else
-                state(State.WAIT_COMPLETE);
+            initiatorCompleteOrWait();
         }
-        else
+        else // follower
         {
             // pre-4.0 nodes should not be connected via streaming, see {@link MessagingService#accept_streaming}
             throw new IllegalStateException(String.format("[Stream #%s] Complete message can be only received by the initiator!", planId()));
@@ -864,22 +874,35 @@ public class StreamSession implements IEndpointStateChangeSubscriber
             return true;
 
         maybeCompleted = true;
-        if (!isFollower)
+        if (!isFollower) // initiator
         {
-            if (state == State.WAIT_COMPLETE)
-                closeSession(State.COMPLETE);
-            else
-                state(State.WAIT_COMPLETE);
+            initiatorCompleteOrWait();
         }
-        else
+        else // follower
         {
-            channel.sendControlMessage(new CompleteMessage());
+            // After sending the message the initiator can close the channel which will cause a ClosedChannelException
+            // in buffer logic, this then gets sent to onError which validates the state isFinalState, if not fails
+            // the session.  To avoid a race condition between sending and setting state, make sure to update the state
+            // before sending the message (without closing the channel)
+            // see CASSANDRA-17116
+            state(State.COMPLETE);
+            channel.sendControlMessage(new CompleteMessage()).syncUninterruptibly();
             closeSession(State.COMPLETE);
         }
 
         return true;
     }
 
+    private void initiatorCompleteOrWait()
+    {
+        // This is called when coordination completes AND when COMPLETE message is seen; it is possible that the
+        // COMPLETE method is seen first!
+        if (state == State.WAIT_COMPLETE)
+            closeSession(State.COMPLETE);
+        else
+            state(State.WAIT_COMPLETE);
+    }
+
     /**
      * Call back on receiving {@code StreamMessage.Type.SESSION_FAILED} message.
      */
@@ -988,6 +1011,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
                 {
                     // pass the session planId/index to the OFM (which is only set at init(), after the transfers have already been created)
                     ofm.header.addSessionInfo(this);
+                    // do not sync here as this does disk access
                     channel.sendControlMessage(ofm);
                 }
             }
diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
index 3403a24..ff1e61f 100644
--- a/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
@@ -29,7 +29,6 @@ import org.apache.cassandra.streaming.StreamingDataOutputPlus;
 import org.apache.cassandra.streaming.StreamManager;
 import org.apache.cassandra.streaming.StreamReceiveException;
 import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.utils.JVMStabilityInspector;
 
 public class IncomingStreamMessage extends StreamMessage
 {
@@ -54,7 +53,9 @@ public class IncomingStreamMessage extends StreamMessage
             }
             catch (Throwable t)
             {
-                JVMStabilityInspector.inspectThrowable(t);
+                if (t instanceof StreamReceiveException)
+                    throw (StreamReceiveException) t;
+                // make sure to wrap so the caller always has access to the session to call onError
                 throw new StreamReceiveException(session, t);
             }
         }
diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
index f8acb22..1d3c09f 100644
--- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
+++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
@@ -140,15 +140,26 @@ public final class JVMStabilityInspector
             if (t instanceof FSError || t instanceof CorruptSSTableException)
                 isUnstable = true;
 
-        fn.accept(t);
-
         // Check for file handle exhaustion
         if (t instanceof FileNotFoundException || t instanceof FileSystemException || t instanceof SocketException)
             if (t.getMessage() != null && t.getMessage().contains("Too many open files"))
                 isUnstable = true;
 
         if (isUnstable)
+        {
+            if (!StorageService.instance.isDaemonSetupCompleted())
+                FileUtils.handleStartupFSError(t);
             killer.killCurrentJVM(t);
+        }
+
+        try
+        {
+            fn.accept(t);
+        }
+        catch (Exception | Error e)
+        {
+            logger.warn("Unexpected error while handling unexpected error", e);
+        }
 
         if (t.getCause() != null)
             inspectThrowable(t.getCause(), fn);
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 753f874..70d9d01 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -50,7 +50,6 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
-
 import javax.annotation.concurrent.GuardedBy;
 
 import com.google.common.collect.ImmutableSet;
@@ -82,18 +81,18 @@ import org.apache.cassandra.distributed.api.LogAction;
 import org.apache.cassandra.distributed.api.NodeToolResult;
 import org.apache.cassandra.distributed.api.TokenSupplier;
 import org.apache.cassandra.distributed.shared.InstanceClassLoader;
-import org.apache.cassandra.utils.Isolated;
 import org.apache.cassandra.distributed.shared.MessageFilters;
 import org.apache.cassandra.distributed.shared.Metrics;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
-import org.apache.cassandra.utils.Shared;
 import org.apache.cassandra.distributed.shared.ShutdownException;
 import org.apache.cassandra.distributed.shared.Versions;
 import org.apache.cassandra.io.util.PathUtils;
 import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Isolated;
+import org.apache.cassandra.utils.Shared;
 import org.apache.cassandra.utils.Shared.Recursive;
 import org.apache.cassandra.utils.concurrent.Condition;
-import org.apache.cassandra.utils.FBUtilities;
 import org.reflections.Reflections;
 import org.reflections.scanners.TypeAnnotationsScanner;
 import org.reflections.util.ConfigurationBuilder;
@@ -273,12 +272,14 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
 
         public boolean isShutdown()
         {
-            return isShutdown;
+            IInvokableInstance delegate = this.delegate;
+            // if the instance shuts down on its own, detect that
+            return isShutdown || (delegate != null && delegate.isShutdown());
         }
 
         private boolean isRunning()
         {
-            return !isShutdown;
+            return !isShutdown();
         }
 
         @Override
@@ -301,6 +302,9 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
             if (isRunning())
                 throw new IllegalStateException("Can not start a instance that is already running");
             isShutdown = false;
+            // if the delegate isn't running, remove so it can be recreated
+            if (delegate != null && delegate.isShutdown())
+                delegate = null;
             if (!broadcastAddress.equals(config.broadcastAddress()))
             {
                 // previous address != desired address, so cleanup
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 8ed9083..e5a933d 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -610,7 +610,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
                 registerInboundFilter(cluster);
                 registerOutboundFilter(cluster);
 
-                JVMStabilityInspector.replaceKiller(new InstanceKiller());
+                JVMStabilityInspector.replaceKiller(new InstanceKiller(Instance.this::shutdown));
 
                 // TODO: this is more than just gossip
                 StorageService.instance.registerDaemon(CassandraDaemon.getInstanceForTesting());
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceKiller.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceKiller.java
index e7ca49b..38b045b 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceKiller.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceKiller.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.distributed.impl;
 
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
@@ -26,6 +27,13 @@ public class InstanceKiller extends JVMStabilityInspector.Killer
 {
     private static final AtomicLong KILL_ATTEMPTS = new AtomicLong(0);
 
+    private final Consumer<Boolean> onKill;
+
+    public InstanceKiller(Consumer<Boolean> onKill)
+    {
+        this.onKill = onKill != null ? onKill : ignore -> {};
+    }
+
     public static long getKillAttempts()
     {
         return KILL_ATTEMPTS.get();
@@ -40,6 +48,7 @@ public class InstanceKiller extends JVMStabilityInspector.Killer
     protected void killCurrentJVM(Throwable t, boolean quiet)
     {
         KILL_ATTEMPTS.incrementAndGet();
+        onKill.accept(quiet);
         // the bad part is that System.exit kills the JVM, so all code which calls kill won't hit the
         // next line; yet in in-JVM dtests System.exit is not desirable, so need to rely on a runtime exception
         // as a means to try to stop execution
diff --git a/test/distributed/org/apache/cassandra/distributed/test/FailingRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/FailingRepairTest.java
index 7feefa3..b95f854 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/FailingRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/FailingRepairTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.distributed.test;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -45,8 +46,6 @@ import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.db.DataRange;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.PartitionPosition;
@@ -55,10 +54,12 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.ICluster;
-import org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableRunnable;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableRunnable;
 import org.apache.cassandra.distributed.impl.InstanceKiller;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
@@ -72,6 +73,7 @@ import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus;
 import org.apache.cassandra.service.StorageService;
+import org.awaitility.Awaitility;
 
 @RunWith(Parameterized.class)
 public class FailingRepairTest extends TestBaseImpl implements Serializable
@@ -162,7 +164,12 @@ public class FailingRepairTest extends TestBaseImpl implements Serializable
     public void cleanupState()
     {
         for (int i = 1; i <= CLUSTER.size(); i++)
-            CLUSTER.get(i).runOnInstance(InstanceKiller::clear);
+        {
+            IInvokableInstance inst = CLUSTER.get(i);
+            if (inst.isShutdown())
+                inst.startup();
+            inst.runOnInstance(InstanceKiller::clear);
+        }
     }
 
     @Test(timeout = 10 * 60 * 1000)
@@ -240,10 +247,7 @@ public class FailingRepairTest extends TestBaseImpl implements Serializable
         // its possible that the coordinator gets the message that the replica failed before the replica completes
         // shutting down; this then means that isKilled could be updated after the fact
         IInvokableInstance replicaInstance = CLUSTER.get(replica);
-        while (replicaInstance.killAttempts() <= 0)
-            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
-
-        Assert.assertEquals("replica should be killed", 1, replicaInstance.killAttempts());
+        Awaitility.await().atMost(Duration.ofSeconds(30)).until(replicaInstance::isShutdown);
         Assert.assertEquals("coordinator should not be killed", 0, CLUSTER.get(coordinator).killAttempts());
     }
 
diff --git a/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java b/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java
index e42f87c..72fd7f9 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java
@@ -121,7 +121,10 @@ public class StreamingTest extends TestBaseImpl
             // verify on follower's stream session
             MessageStateSinkImpl followerSink = new MessageStateSinkImpl();
             followerSink.messages(initiator, Arrays.asList(STREAM_INIT, PREPARE_SYN, PREPARE_ACK, RECEIVED));
-            followerSink.states(initiator,  Arrays.asList(PREPARING, STREAMING, StreamSession.State.COMPLETE));
+            // why 2 completes?  There is a race condition bug with sending COMPLETE where the socket gets closed
+            // by the initator, which then triggers a ClosedChannelException, which then checks the current state (PREPARING)
+            // to solve this, COMPLETE is set before sending the message, and reset when closing the stream
+            followerSink.states(initiator,  Arrays.asList(PREPARING, STREAMING, StreamSession.State.COMPLETE, StreamSession.State.COMPLETE));
             followerNode.runOnInstance(() -> StreamSession.sink = followerSink);
         }
 
diff --git a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamCloseInMiddleTest.java b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamCloseInMiddleTest.java
new file mode 100644
index 0000000..fc52ab6
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamCloseInMiddleTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.streaming.CassandraIncomingFile;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
+import org.apache.cassandra.io.sstable.format.big.BigTableZeroCopyWriter;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.assertj.core.api.Assertions;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+
+public class StreamCloseInMiddleTest extends TestBaseImpl
+{
+    @Test
+    public void zeroCopy() throws IOException
+    {
+        streamClose(true);
+    }
+
+    @Test
+    public void notZeroCopy() throws IOException
+    {
+        streamClose(false);
+    }
+
+    private void streamClose(boolean zeroCopyStreaming) throws IOException
+    {
+        try (Cluster cluster = Cluster.build(2)
+                                      .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(3))
+                                      .withInstanceInitializer(BBHelper::install)
+                                      .withConfig(c -> c.with(Feature.values())
+                                                        .set("stream_entire_sstables", zeroCopyStreaming)
+                                                        // when die, this will try to halt JVM, which is easier to validate in the test
+                                                        // other levels require checking state of the subsystems
+                                                        .set("disk_failure_policy", "die"))
+                                      .start())
+        {
+            init(cluster);
+
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int PRIMARY KEY)"));
+
+            triggerStreaming(cluster, zeroCopyStreaming);
+            // make sure disk failure policy is not triggered
+            assertNoNodeShutdown(cluster);
+
+            // now bootstrap a new node; streaming will fail
+            IInvokableInstance node3 = ClusterUtils.addInstance(cluster, cluster.get(1).config(), c -> c.set("auto_bootstrap", true));
+            node3.startup();
+            for (String line : Arrays.asList("Error while waiting on bootstrap to complete. Bootstrap will have to be restarted", // bootstrap failed
+                                             "Some data streaming failed. Use nodetool to check bootstrap state and resume")) // didn't join ring because bootstrap failed
+                Assertions.assertThat(node3.logs().grep(line).getResult())
+                          .hasSize(1);
+
+            assertNoNodeShutdown(cluster);
+        }
+    }
+
+    private void assertNoNodeShutdown(Cluster cluster)
+    {
+        AssertionError t = null;
+        for (IInvokableInstance i : cluster.stream().collect(Collectors.toList()))
+        {
+            try
+            {
+                Assertions.assertThat(i.isShutdown()).describedAs("%s was shutdown; this is not expected", i).isFalse();
+                Assertions.assertThat(i.killAttempts()).describedAs("%s saw kill attempts; this is not expected", i).isEqualTo(0);
+            }
+            catch (AssertionError t2)
+            {
+                if (t == null)
+                    t = t2;
+                else
+                    t.addSuppressed(t2);
+            }
+        }
+        if (t != null)
+            throw t;
+    }
+
+    private static void triggerStreaming(Cluster cluster, boolean expectedEntireSSTable)
+    {
+        IInvokableInstance node1 = cluster.get(1);
+        IInvokableInstance node2 = cluster.get(2);
+
+        // repair will do streaming IFF there is a mismatch; so cause one
+        for (int i = 0; i < 10; i++)
+            node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk) VALUES (?)"), i); // timestamp won't match, causing a mismatch
+
+        // trigger streaming; expected to fail as streaming socket closed in the middle (currently this is an unrecoverable event)
+        node2.nodetoolResult("repair", "-full", KEYSPACE, "tbl").asserts().failure();
+
+        assertStreamingType(node2, expectedEntireSSTable);
+    }
+
+    private static void assertStreamingType(IInvokableInstance node, boolean expectedEntireSSTable)
+    {
+        String key = "org.apache.cassandra.metrics.Streaming.%s./127.0.0.1.7012";
+        long entire = node.metrics().getCounter(String.format(key, "EntireSSTablesStreamedIn"));
+        long partial = node.metrics().getCounter(String.format(key, "PartialSSTablesStreamedIn"));
+        if (expectedEntireSSTable)
+        {
+            Assertions.assertThat(partial).isEqualTo(0);
+            Assertions.assertThat(entire).isGreaterThan(0);
+        }
+        else
+        {
+            Assertions.assertThat(partial).isGreaterThan(0);
+            Assertions.assertThat(entire).isEqualTo(0);
+        }
+    }
+
+    public static class BBHelper
+    {
+        @SuppressWarnings("unused")
+        public static int writeDirectlyToChannel(ByteBuffer buf, @SuperCall Callable<Integer> zuper) throws Exception
+        {
+            if (isCaller(BigTableZeroCopyWriter.class.getName(), "write"))
+                throw new java.nio.channels.ClosedChannelException();
+            // different context; pass through
+            return zuper.call();
+        }
+
+        @SuppressWarnings("unused")
+        public static boolean append(UnfilteredRowIterator partition, @SuperCall Callable<Boolean> zuper) throws Exception
+        {
+            if (isCaller(CassandraIncomingFile.class.getName(), "read")) // handles compressed and non-compressed
+                throw new java.nio.channels.ClosedChannelException();
+            // different context; pass through
+            return zuper.call();
+        }
+
+        private static boolean isCaller(String klass, String method)
+        {
+            //TODO is there a cleaner way to check this?
+            StackTraceElement[] stack = Thread.currentThread().getStackTrace();
+            for (int i = 0; i < stack.length; i++)
+            {
+                StackTraceElement e = stack[i];
+                if (klass.equals(e.getClassName()) && method.equals(e.getMethodName()))
+                    return true;
+            }
+            return false;
+        }
+
+        public static void install(ClassLoader classLoader, Integer num)
+        {
+            new ByteBuddy().rebase(SequentialWriter.class)
+                           .method(named("writeDirectlyToChannel").and(takesArguments(1)))
+                           .intercept(MethodDelegation.to(BBHelper.class))
+                           .make()
+                           .load(classLoader, ClassLoadingStrategy.Default.INJECTION);
+
+            new ByteBuddy().rebase(RangeAwareSSTableWriter.class)
+                           .method(named("append").and(takesArguments(1)))
+                           .intercept(MethodDelegation.to(BBHelper.class))
+                           .make()
+                           .load(classLoader, ClassLoadingStrategy.Default.INJECTION);
+        }
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeGossipTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeGossipTest.java
index f62936c..35c4fb3 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeGossipTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeGossipTest.java
@@ -51,8 +51,8 @@ public class MixedModeGossipTest extends UpgradeTestBase
         .nodes(3)
         .nodesToUpgradeOrdered(1, 2, 3)
         // all upgrades from v30 up, excluding v30->v3X and from v40
-        .singleUpgrade(v30, v40)
-        .singleUpgrade(v3X, v40)
+        .singleUpgrade(v30)
+        .singleUpgrade(v3X)
         .setup(c -> {})
         .runAfterNodeUpgrade((cluster, node) -> {
             if (node == 1) {
@@ -87,8 +87,8 @@ public class MixedModeGossipTest extends UpgradeTestBase
         .nodes(3)
         .nodesToUpgradeOrdered(1, 2, 3)
         // all upgrades from v30 up, excluding v30->v3X and from v40
-        .singleUpgrade(v30, v40)
-	      .singleUpgrade(v3X, v40)
+        .singleUpgrade(v30)
+        .singleUpgrade(v3X)
         .setup(cluster -> {
             // node2 and node3 gossiper cannot talk with each other
             cluster.filters().verbs(Verb.GOSSIP_DIGEST_SYN.id).from(2).to(3).drop();
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java
index 72b7f1a..8010853 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java
@@ -83,7 +83,7 @@ public class MixedModeMessageForwardTest extends UpgradeTestBase
         .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK).set("request_timeout_in_ms", 30000))
         .withBuilder(b -> b.withRacks(numDCs, 1, nodesPerDc))
         .nodes(numDCs * nodesPerDc)
-        .singleUpgrade(v30, v40)
+        .singleUpgrade(v30)
         .setup(cluster -> {
             cluster.schemaChange("ALTER KEYSPACE " + KEYSPACE +
                 " WITH replication = {'class': 'NetworkTopologyStrategy', " + ntsArgs + " };");
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadTest.java
index 631f65a..b11678d 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadTest.java
@@ -40,8 +40,8 @@ public class MixedModeReadTest extends UpgradeTestBase
         .nodes(2)
         .nodesToUpgrade(1)
         // all upgrades from v30 up, excluding v30->v3X and from v40
-        .singleUpgrade(v30, v40)
-	      .singleUpgrade(v3X, v40)
+        .singleUpgrade(v30)
+        .singleUpgrade(v3X)
         .setup(cluster -> {
             cluster.schemaChange(CREATE_TABLE);
             insertData(cluster.coordinator(1));
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeRepairTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeRepairTest.java
index 478a1b7..813d9f2 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeRepairTest.java
@@ -54,7 +54,7 @@ public class MixedModeRepairTest extends UpgradeTestBase
         new UpgradeTestBase.TestCase()
         .nodes(2)
         .nodesToUpgrade(UPGRADED_NODE)
-		    .singleUpgrade(v3X, v40)
+        .singleUpgrade(v3X)
         .withConfig(config -> config.with(NETWORK, GOSSIP))
         .setup(cluster -> {
             cluster.schemaChange(CREATE_TABLE);
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/Pre40MessageFilterTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/Pre40MessageFilterTest.java
index 3e6c9ca..4cca7b9 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/Pre40MessageFilterTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/Pre40MessageFilterTest.java
@@ -35,7 +35,7 @@ public class Pre40MessageFilterTest extends UpgradeTestBase
         .withConfig(configConsumer)
         .nodesToUpgrade(1)
         // all upgrades from v30 up, excluding v30->v3X
-        .singleUpgrade(v30, v40)
+        .singleUpgrade(v30)
         .upgradesFrom(v3X)
         .setup((cluster) -> {
             cluster.filters().outbound().allVerbs().messagesMatching((f,t,m) -> false).drop();
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
index 768c6cc..ce0d8e9 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
@@ -32,6 +32,9 @@ import com.vdurmont.semver4j.Semver.SemverType;
 import org.junit.After;
 import org.junit.BeforeClass;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.distributed.UpgradeableCluster;
@@ -41,6 +44,7 @@ import org.apache.cassandra.distributed.shared.DistributedTestBase;
 import org.apache.cassandra.distributed.shared.ThrowingRunnable;
 import org.apache.cassandra.distributed.shared.Versions;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 import static org.apache.cassandra.distributed.shared.Versions.Version;
@@ -50,6 +54,8 @@ import static org.apache.cassandra.distributed.shared.Versions.find;
 
 public class UpgradeTestBase extends DistributedTestBase
 {
+    private static final Logger logger = LoggerFactory.getLogger(UpgradeTestBase.class);
+
     @After
     public void afterEach()
     {
@@ -157,9 +163,9 @@ public class UpgradeTestBase extends DistributedTestBase
         }
 
         /** Will test this specific upgrade path **/
-        public TestCase singleUpgrade(Semver from, Semver to)
+        public TestCase singleUpgrade(Semver from)
         {
-            this.upgrade.add(new TestVersions(versions.getLatest(from), versions.getLatest(to)));
+            this.upgrade.add(new TestVersions(versions.getLatest(from), versions.getLatest(CURRENT)));
             return this;
         }
 
@@ -219,7 +225,7 @@ public class UpgradeTestBase extends DistributedTestBase
 
             for (TestVersions upgrade : this.upgrade)
             {
-                System.out.printf("testing upgrade from %s to %s%n", upgrade.initial.version, upgrade.upgrade.version);
+                logger.info("testing upgrade from {} to {}", upgrade.initial.version, upgrade.upgrade.version);
                 try (UpgradeableCluster cluster = init(UpgradeableCluster.create(nodeCount, upgrade.initial, configConsumer, builderConsumer)))
                 {
                     setup.run(cluster);
diff --git a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
index d9bbf10..84c6415 100644
--- a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
+++ b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
@@ -241,7 +241,7 @@ public class ZeroCopyStreamingBenchmark
 
     @Benchmark
     @BenchmarkMode(Mode.Throughput)
-    public void blockStreamReader(BenchmarkState state) throws Exception
+    public void blockStreamReader(BenchmarkState state) throws Throwable
     {
         EmbeddedChannel channel = createMockNettyChannel();
         AsyncStreamingInputPlus in = new AsyncStreamingInputPlus(channel);
@@ -265,7 +265,7 @@ public class ZeroCopyStreamingBenchmark
 
     @Benchmark
     @BenchmarkMode(Mode.Throughput)
-    public void partialStreamReader(BenchmarkState state) throws Exception
+    public void partialStreamReader(BenchmarkState state) throws Throwable
     {
         EmbeddedChannel channel = createMockNettyChannel();
         AsyncStreamingInputPlus in = new AsyncStreamingInputPlus(channel);
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
index bf7c148..c3ffe1f 100644
--- a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
@@ -129,7 +129,7 @@ public class CassandraEntireSSTableStreamWriterTest
     }
 
     @Test
-    public void testBlockReadingAndWritingOverWire() throws Exception
+    public void testBlockReadingAndWritingOverWire() throws Throwable
     {
         StreamSession session = setupStreamingSessionForTest();
         InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
diff --git a/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java b/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java
index 4d2242b..edd2c24 100644
--- a/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java
+++ b/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java
@@ -160,7 +160,7 @@ public class EntireSSTableStreamConcurrentComponentMutationTest
     }
 
     @Test
-    public void testStream() throws Exception
+    public void testStream() throws Throwable
     {
         testStreamWithConcurrentComponentMutation(NO_OP, NO_OP);
     }
@@ -170,7 +170,7 @@ public class EntireSSTableStreamConcurrentComponentMutationTest
      * update causes the actual transfered file size to be different from the one in {@link ComponentManifest}
      */
     @Test
-    public void testStreamWithStatsMutation() throws Exception
+    public void testStreamWithStatsMutation() throws Throwable
     {
         testStreamWithConcurrentComponentMutation(() -> {
 
@@ -188,7 +188,7 @@ public class EntireSSTableStreamConcurrentComponentMutationTest
             targetLocation = "AFTER INVOKE serialize",
             condition = "$descriptor.cfname.contains(\"Standard1\")",
             action = "org.apache.cassandra.db.streaming.EntireSSTableStreamConcurrentComponentMutationTest.countDown();Thread.sleep(5000);")
-    public void testStreamWithIndexSummaryRedistributionDelaySavingSummary() throws Exception
+    public void testStreamWithIndexSummaryRedistributionDelaySavingSummary() throws Throwable
     {
         testStreamWithConcurrentComponentMutation(() -> {
             // wait until new index summary is partially written
@@ -203,7 +203,7 @@ public class EntireSSTableStreamConcurrentComponentMutationTest
         latch.countDown();
     }
 
-    private void testStreamWithConcurrentComponentMutation(Callable<?> runBeforeStreaming, Callable<?> runConcurrentWithStreaming) throws Exception
+    private void testStreamWithConcurrentComponentMutation(Callable<?> runBeforeStreaming, Callable<?> runConcurrentWithStreaming) throws Throwable
     {
         ByteBuf serializedFile = Unpooled.buffer(8192);
         InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
diff --git a/test/unit/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriterTest.java
index d3aed25..ddd3a13 100644
--- a/test/unit/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriterTest.java
@@ -19,7 +19,9 @@
 package org.apache.cassandra.io.sstable.format.big;
 
 import java.io.ByteArrayInputStream;
+import java.io.UncheckedIOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Collection;
@@ -161,7 +163,14 @@ public class BigTableZeroCopyWriterTest
             {
                 Pair<DataInputPlus, Long> pair = getSSTableComponentData(sstable, component, bufferMapper);
 
-                btzcw.writeComponent(component.type, pair.left, pair.right);
+                try
+                {
+                    btzcw.writeComponent(component.type, pair.left, pair.right);
+                }
+                catch (ClosedChannelException e)
+                {
+                    throw new UncheckedIOException(e);
+                }
             }
         }
 
diff --git a/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java b/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java
index f86be38..3a3415e 100644
--- a/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java
+++ b/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.utils;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.SocketException;
+import java.util.Arrays;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -30,6 +31,10 @@ import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.assertj.core.api.Assertions;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.CassandraDaemon;
+import org.apache.cassandra.service.DefaultFSErrorHandler;
+import org.apache.cassandra.service.StorageService;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
@@ -54,43 +59,62 @@ public class JVMStabilityInspectorTest
 
         Config.DiskFailurePolicy oldPolicy = DatabaseDescriptor.getDiskFailurePolicy();
         Config.CommitFailurePolicy oldCommitPolicy = DatabaseDescriptor.getCommitFailurePolicy();
+        FileUtils.setFSErrorHandler(new DefaultFSErrorHandler());
         try
         {
-            killerForTests.reset();
-            JVMStabilityInspector.inspectThrowable(new IOException());
-            assertFalse(killerForTests.wasKilled());
-
-            DatabaseDescriptor.setDiskFailurePolicy(Config.DiskFailurePolicy.die);
-            killerForTests.reset();
-            JVMStabilityInspector.inspectThrowable(new FSReadError(new IOException(), "blah"));
-            assertTrue(killerForTests.wasKilled());
-
-            killerForTests.reset();
-            JVMStabilityInspector.inspectThrowable(new FSWriteError(new IOException(), "blah"));
-            assertTrue(killerForTests.wasKilled());
-
-            killerForTests.reset();
-            JVMStabilityInspector.inspectThrowable(new CorruptSSTableException(new IOException(), "blah"));
-            assertTrue(killerForTests.wasKilled());
-
-            killerForTests.reset();
-            JVMStabilityInspector.inspectThrowable(new RuntimeException(new CorruptSSTableException(new IOException(), "blah")));
-            assertTrue(killerForTests.wasKilled());
-
-            DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.die);
-            killerForTests.reset();
-            JVMStabilityInspector.inspectCommitLogThrowable(new Throwable());
-            assertTrue(killerForTests.wasKilled());
-
-            killerForTests.reset();
-            JVMStabilityInspector.inspectThrowable(new Exception(new IOException()));
-            assertFalse(killerForTests.wasKilled());
+            CassandraDaemon daemon = new CassandraDaemon();
+            daemon.completeSetup();
+            for (boolean daemonSetupCompleted : Arrays.asList(false, true))
+            {
+                // disk policy acts differently depending on if setup is complete or not; which is defined by
+                // the daemon thread not being null
+                StorageService.instance.registerDaemon(daemonSetupCompleted ? daemon : null);
+
+                try
+                {
+                    killerForTests.reset();
+                    JVMStabilityInspector.inspectThrowable(new IOException());
+                    assertFalse(killerForTests.wasKilled());
+
+                    DatabaseDescriptor.setDiskFailurePolicy(Config.DiskFailurePolicy.die);
+                    killerForTests.reset();
+                    JVMStabilityInspector.inspectThrowable(new FSReadError(new IOException(), "blah"));
+                    assertTrue(killerForTests.wasKilled());
+
+                    killerForTests.reset();
+                    JVMStabilityInspector.inspectThrowable(new FSWriteError(new IOException(), "blah"));
+                    assertTrue(killerForTests.wasKilled());
+
+                    killerForTests.reset();
+                    JVMStabilityInspector.inspectThrowable(new CorruptSSTableException(new IOException(), "blah"));
+                    assertTrue(killerForTests.wasKilled());
+
+                    killerForTests.reset();
+                    JVMStabilityInspector.inspectThrowable(new RuntimeException(new CorruptSSTableException(new IOException(), "blah")));
+                    assertTrue(killerForTests.wasKilled());
+
+                    DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.die);
+                    killerForTests.reset();
+                    JVMStabilityInspector.inspectCommitLogThrowable(new Throwable());
+                    assertTrue(killerForTests.wasKilled());
+
+                    killerForTests.reset();
+                    JVMStabilityInspector.inspectThrowable(new Exception(new IOException()));
+                    assertFalse(killerForTests.wasKilled());
+                }
+                catch (Exception | Error e)
+                {
+                    throw new AssertionError("Failure when daemonSetupCompleted=" + daemonSetupCompleted, e);
+                }
+            }
         }
         finally
         {
             JVMStabilityInspector.replaceKiller(originalKiller);
             DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
             DatabaseDescriptor.setCommitFailurePolicy(oldCommitPolicy);
+            StorageService.instance.registerDaemon(null);
+            FileUtils.setFSErrorHandler(null);
         }
     }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org