You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2022/02/04 19:37:10 UTC
[cassandra] branch trunk updated: When streaming sees a ClosedChannelException this triggers the disk failure policy
This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new df16b37 When streaming sees a ClosedChannelException this triggers the disk failure policy
df16b37 is described below
commit df16b3750dc2c1b6b9bcdece6f81dfd3de7ebdfa
Author: David Capwell <dc...@apache.org>
AuthorDate: Fri Feb 4 10:15:58 2022 -0800
When streaming sees a ClosedChannelException this triggers the disk failure policy
patch by David Capwell, Francisco Guerrero; reviewed by Caleb Rackliffe, Dinesh Joshi for CASSANDRA-17116
---
CHANGES.txt | 1 +
.../streaming/CassandraCompressedStreamReader.java | 13 +-
.../CassandraEntireSSTableStreamReader.java | 10 +-
.../db/streaming/CassandraIncomingFile.java | 4 +-
.../db/streaming/CassandraStreamReader.java | 40 +++--
.../cassandra/db/streaming/ComponentManifest.java | 8 +
.../cassandra/db/streaming/IStreamReader.java | 4 +-
.../org/apache/cassandra/io/FSErrorHandler.java | 1 +
.../sstable/format/big/BigTableZeroCopyWriter.java | 12 +-
.../org/apache/cassandra/io/util/FileUtils.java | 5 +
.../apache/cassandra/metrics/StreamingMetrics.java | 10 ++
.../cassandra/service/DefaultFSErrorHandler.java | 3 +-
.../cassandra/service/NativeTransportService.java | 3 +-
.../apache/cassandra/service/StorageService.java | 4 +-
.../apache/cassandra/streaming/IncomingStream.java | 4 +-
.../streaming/StreamDeserializingTask.java | 8 +-
.../streaming/StreamReceiveException.java | 4 +-
.../cassandra/streaming/StreamResultFuture.java | 2 +-
.../apache/cassandra/streaming/StreamSession.java | 70 +++++---
.../streaming/messages/IncomingStreamMessage.java | 5 +-
.../cassandra/utils/JVMStabilityInspector.java | 15 +-
.../distributed/impl/AbstractCluster.java | 16 +-
.../cassandra/distributed/impl/Instance.java | 2 +-
.../cassandra/distributed/impl/InstanceKiller.java | 9 +
.../distributed/test/FailingRepairTest.java | 20 ++-
.../cassandra/distributed/test/StreamingTest.java | 5 +-
.../test/streaming/StreamCloseInMiddleTest.java | 196 +++++++++++++++++++++
.../distributed/upgrade/MixedModeGossipTest.java | 8 +-
.../upgrade/MixedModeMessageForwardTest.java | 2 +-
.../distributed/upgrade/MixedModeReadTest.java | 4 +-
.../distributed/upgrade/MixedModeRepairTest.java | 2 +-
.../upgrade/Pre40MessageFilterTest.java | 2 +-
.../distributed/upgrade/UpgradeTestBase.java | 12 +-
.../microbench/ZeroCopyStreamingBenchmark.java | 4 +-
.../CassandraEntireSSTableStreamWriterTest.java | 2 +-
...TableStreamConcurrentComponentMutationTest.java | 8 +-
.../format/big/BigTableZeroCopyWriterTest.java | 11 +-
.../cassandra/utils/JVMStabilityInspectorTest.java | 82 ++++++---
38 files changed, 461 insertions(+), 150 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index fe3fb22..a16f5f3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1
+ * When streaming sees a ClosedChannelException this triggers the disk failure policy (CASSANDRA-17116)
* Add a virtual table for exposing prepared statements metrics (CASSANDRA-17224)
* Remove python 2.x support from cqlsh (CASSANDRA-17242)
* Prewarm role and credential caches to avoid timeouts at startup (CASSANDRA-16958)
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
index cfa9018..dda874b 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db.streaming;
import java.io.IOException;
-import com.google.common.base.Throwables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,8 +33,6 @@ import org.apache.cassandra.streaming.messages.StreamMessageHeader;
import org.apache.cassandra.utils.ChecksumType;
import org.apache.cassandra.utils.FBUtilities;
-import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause;
-
/**
* CassandraStreamReader that reads from streamed compressed SSTable
*/
@@ -57,7 +54,7 @@ public class CassandraCompressedStreamReader extends CassandraStreamReader
*/
@Override
@SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed
- public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException
+ public SSTableMultiWriter read(DataInputPlus inputPlus) throws Throwable
{
long totalSize = totalSize();
@@ -110,12 +107,8 @@ public class CassandraCompressedStreamReader extends CassandraStreamReader
logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
session.planId(), partitionKey, cfs.keyspace.getName(), cfs.getTableName());
if (writer != null)
- {
- writer.abort(e);
- }
- if (extractIOExceptionCause(e).isPresent())
- throw e;
- throw Throwables.propagate(e);
+ e = writer.abort(e);
+ throw e;
}
}
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
index e547e0f..261c59e 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
@@ -22,14 +22,12 @@ import java.io.IOException;
import java.util.Collection;
import java.util.function.UnaryOperator;
-import com.google.common.base.Throwables;
-import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
-import org.apache.cassandra.io.util.File;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
@@ -37,6 +35,7 @@ import org.apache.cassandra.io.sstable.format.SSTableFormat;
import org.apache.cassandra.io.sstable.format.big.BigTableZeroCopyWriter;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.File;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.StreamReceiver;
@@ -85,7 +84,7 @@ public class CassandraEntireSSTableStreamReader implements IStreamReader
*/
@SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed
@Override
- public SSTableMultiWriter read(DataInputPlus in) throws IOException
+ public SSTableMultiWriter read(DataInputPlus in) throws Throwable
{
ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
if (cfs == null)
@@ -147,8 +146,7 @@ public class CassandraEntireSSTableStreamReader implements IStreamReader
logger.error("[Stream {}] Error while reading sstable from stream for table = {}", session.planId(), cfs.metadata(), e);
if (writer != null)
e = writer.abort(e);
- Throwables.throwIfUnchecked(e);
- throw new RuntimeException(e);
+ throw e;
}
}
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
index 11a18a0..74946a8 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.db.streaming;
-import java.io.IOException;
import java.util.Objects;
import com.google.common.base.Preconditions;
@@ -64,10 +63,11 @@ public class CassandraIncomingFile implements IncomingStream
}
@Override
- public synchronized void read(DataInputPlus in, int version) throws IOException
+ public synchronized void read(DataInputPlus in, int version) throws Throwable
{
CassandraStreamHeader streamHeader = CassandraStreamHeader.serializer.deserialize(in, version);
logger.debug("Incoming stream entireSSTable={} components={}", streamHeader.isEntireSSTable, streamHeader.componentManifest);
+ session.countStreamedIn(streamHeader.isEntireSSTable);
IStreamReader reader;
if (streamHeader.isEntireSSTable)
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
index 6835fad..7d39e83 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
@@ -17,31 +17,39 @@
*/
package org.apache.cassandra.db.streaming;
-import java.io.*;
+import java.io.IOError;
+import java.io.IOException;
import java.util.Collection;
import java.util.UUID;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.collect.UnmodifiableIterator;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.exceptions.UnknownColumnException;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.TrackedDataInputPlus;
-import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.RegularAndStaticColumns;
+import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
-import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.rows.DeserializationHelper;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.exceptions.UnknownColumnException;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.TrackedDataInputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.StreamReceiver;
import org.apache.cassandra.streaming.StreamSession;
@@ -98,16 +106,14 @@ public class CassandraStreamReader implements IStreamReader
*/
@SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed
@Override
- public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException
+ public SSTableMultiWriter read(DataInputPlus inputPlus) throws Throwable
{
long totalSize = totalSize();
ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
if (cfs == null)
- {
// schema was dropped during streaming
- throw new IOException("CF " + tableId + " was dropped during streaming");
- }
+ throw new IllegalStateException("Table " + tableId + " was dropped during streaming");
logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}', pendingRepair = '{}'.",
session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
@@ -136,10 +142,8 @@ public class CassandraStreamReader implements IStreamReader
logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
session.planId(), partitionKey, cfs.keyspace.getName(), cfs.getTableName(), e);
if (writer != null)
- {
- writer.abort(e);
- }
- throw Throwables.propagate(e);
+ e = writer.abort(e);
+ throw e;
}
}
diff --git a/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java b/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
index 71aa0f8..b77b594 100644
--- a/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
+++ b/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
@@ -106,6 +106,14 @@ public final class ComponentManifest implements Iterable<Component>
return components.hashCode();
}
+ @Override
+ public String toString()
+ {
+ return "ComponentManifest{" +
+ "components=" + components +
+ '}';
+ }
+
public static final IVersionedSerializer<ComponentManifest> serializer = new IVersionedSerializer<ComponentManifest>()
{
public void serialize(ComponentManifest manifest, DataOutputPlus out, int version) throws IOException
diff --git a/src/java/org/apache/cassandra/db/streaming/IStreamReader.java b/src/java/org/apache/cassandra/db/streaming/IStreamReader.java
index cf93bc2..e7cb2a2 100644
--- a/src/java/org/apache/cassandra/db/streaming/IStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/IStreamReader.java
@@ -18,8 +18,6 @@
package org.apache.cassandra.db.streaming;
-import java.io.IOException;
-
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -28,5 +26,5 @@ import org.apache.cassandra.io.util.DataInputPlus;
*/
public interface IStreamReader
{
- public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException;
+ SSTableMultiWriter read(DataInputPlus inputPlus) throws Throwable;
}
diff --git a/src/java/org/apache/cassandra/io/FSErrorHandler.java b/src/java/org/apache/cassandra/io/FSErrorHandler.java
index 081ec0b..b7d2836 100644
--- a/src/java/org/apache/cassandra/io/FSErrorHandler.java
+++ b/src/java/org/apache/cassandra/io/FSErrorHandler.java
@@ -27,4 +27,5 @@ public interface FSErrorHandler
{
void handleCorruptSSTable(CorruptSSTableException e);
void handleFSError(FSError e);
+ default void handleStartupFSError(Throwable t) {}
}
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java
index 717e9d9..c47c23b 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.io.sstable.format.big;
import java.io.EOFException;
import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
import java.util.Collection;
import java.util.EnumMap;
import java.util.Map;
@@ -198,7 +199,7 @@ public class BigTableZeroCopyWriter extends SSTable implements SSTableMultiWrite
writer.close();
}
- public void writeComponent(Component.Type type, DataInputPlus in, long size)
+ public void writeComponent(Component.Type type, DataInputPlus in, long size) throws ClosedChannelException
{
logger.info("Writing component {} to {} length {}", type, componentWriters.get(type).getPath(), prettyPrintMemory(size));
@@ -208,7 +209,7 @@ public class BigTableZeroCopyWriter extends SSTable implements SSTableMultiWrite
write(in, size, componentWriters.get(type));
}
- private void write(AsyncStreamingInputPlus in, long size, SequentialWriter writer)
+ private void write(AsyncStreamingInputPlus in, long size, SequentialWriter writer) throws ClosedChannelException
{
logger.info("Block Writing component to {} length {}", writer.getPath(), prettyPrintMemory(size));
@@ -222,6 +223,13 @@ public class BigTableZeroCopyWriter extends SSTable implements SSTableMultiWrite
{
in.close();
}
+ catch (ClosedChannelException e)
+ {
+ // FSWriteError triggers disk failure policy, but if we get a connection issue we do not want to do that
+ // so rethrow so the error handling logic higher up is able to deal with this
+ // see CASSANDRA-17116
+ throw e;
+ }
catch (IOException e)
{
throw new FSWriteError(e, writer.getPath());
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 45abd7b..ea54497 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -463,6 +463,11 @@ public final class FileUtils
fsErrorHandler.get().ifPresent(handler -> handler.handleFSError(e));
}
+ public static void handleStartupFSError(Throwable t)
+ {
+ fsErrorHandler.get().ifPresent(handler -> handler.handleStartupFSError(t));
+ }
+
/**
* handleFSErrorAndPropagate will invoke the disk failure policy error handler,
* which may or may not stop the daemon or transports. However, if we don't exit,
diff --git a/src/java/org/apache/cassandra/metrics/StreamingMetrics.java b/src/java/org/apache/cassandra/metrics/StreamingMetrics.java
index 54df233..e38b605 100644
--- a/src/java/org/apache/cassandra/metrics/StreamingMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/StreamingMetrics.java
@@ -47,6 +47,8 @@ public class StreamingMetrics
public final Counter outgoingBytes;
/* Measures the time taken for processing the incoming stream message after being deserialized, including the time to flush to disk. */
public final Timer incomingProcessTime;
+ private final Counter entireSSTablesStreamedIn;
+ private final Counter partialSSTablesStreamedIn;
public static StreamingMetrics get(InetAddressAndPort ip)
{
@@ -79,5 +81,13 @@ public class StreamingMetrics
incomingBytes = Metrics.counter(factory.createMetricName("IncomingBytes"));
outgoingBytes= Metrics.counter(factory.createMetricName("OutgoingBytes"));
incomingProcessTime = Metrics.timer(factory.createMetricName("IncomingProcessTime"));
+
+ entireSSTablesStreamedIn = Metrics.counter(factory.createMetricName("EntireSSTablesStreamedIn"));
+ partialSSTablesStreamedIn = Metrics.counter(factory.createMetricName("PartialSSTablesStreamedIn"));
+ }
+
+ public void countStreamedIn(boolean isEntireSSTable)
+ {
+ (isEntireSSTable ? entireSSTablesStreamedIn : partialSSTablesStreamedIn).inc();
}
}
diff --git a/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java b/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java
index 04cb11c..6a9e15e 100644
--- a/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java
+++ b/src/java/org/apache/cassandra/service/DefaultFSErrorHandler.java
@@ -94,7 +94,8 @@ public class DefaultFSErrorHandler implements FSErrorHandler
}
}
- private static void handleStartupFSError(Throwable t)
+ @Override
+ public void handleStartupFSError(Throwable t)
{
switch (DatabaseDescriptor.getDiskFailurePolicy())
{
diff --git a/src/java/org/apache/cassandra/service/NativeTransportService.java b/src/java/org/apache/cassandra/service/NativeTransportService.java
index 7556f81..f131d74 100644
--- a/src/java/org/apache/cassandra/service/NativeTransportService.java
+++ b/src/java/org/apache/cassandra/service/NativeTransportService.java
@@ -149,7 +149,8 @@ public class NativeTransportService
servers = Collections.emptyList();
// shutdown executors used by netty for native transport server
- workerGroup.shutdownGracefully(3, 5, TimeUnit.SECONDS).awaitUninterruptibly();
+ if (workerGroup != null)
+ workerGroup.shutdownGracefully(3, 5, TimeUnit.SECONDS).awaitUninterruptibly();
Dispatcher.shutdown();
}
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index fcff144..d5b676f 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -521,9 +521,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public boolean isDaemonSetupCompleted()
{
- return daemon == null
- ? false
- : daemon.setupCompleted();
+ return daemon != null && daemon.setupCompleted();
}
public void stopDaemon()
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStream.java b/src/java/org/apache/cassandra/streaming/IncomingStream.java
index 0733249..25ab626 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStream.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStream.java
@@ -18,8 +18,6 @@
package org.apache.cassandra.streaming;
-import java.io.IOException;
-
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.schema.TableId;
@@ -37,7 +35,7 @@ public interface IncomingStream
/**
* Read in the stream data.
*/
- void read(DataInputPlus inputPlus, int version) throws IOException;
+ void read(DataInputPlus inputPlus, int version) throws Throwable;
String getName();
long getSize();
diff --git a/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java b/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java
index dd2678c..3785249 100644
--- a/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamDeserializingTask.java
@@ -18,10 +18,7 @@
package org.apache.cassandra.streaming;
-import java.nio.channels.ClosedChannelException;
-
import com.google.common.annotations.VisibleForTesting;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,9 +76,6 @@ public class StreamDeserializingTask implements Runnable
session.messageReceived(message);
}
}
- catch (ClosedChannelException ignore)
- {
- }
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
@@ -91,7 +85,7 @@ public class StreamDeserializingTask implements Runnable
}
else if (t instanceof StreamReceiveException)
{
- ((StreamReceiveException)t).session.onError(t);
+ ((StreamReceiveException)t).session.onError(t.getCause());
}
else
{
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveException.java b/src/java/org/apache/cassandra/streaming/StreamReceiveException.java
index 54b365a..c564182 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveException.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveException.java
@@ -18,7 +18,9 @@
package org.apache.cassandra.streaming;
-public class StreamReceiveException extends RuntimeException
+import java.io.IOException;
+
+public class StreamReceiveException extends IOException
{
public final StreamSession session;
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index f7a0b63..66e99be 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -191,7 +191,7 @@ public final class StreamResultFuture extends AsyncFuture<StreamState>
void handleSessionComplete(StreamSession session)
{
- logger.info("[Stream #{}] Session with {} is complete", session.planId(), session.peer);
+ logger.info("[Stream #{}] Session with {} is {}", session.planId(), session.peer, session.state().name().toLowerCase());
fireStreamEvent(new StreamEvent.SessionCompleteEvent(session));
SessionInfo sessionInfo = session.getSessionInfo();
coordinator.addSessionInfo(sessionInfo);
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index bcf19d5..fd24539 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.streaming;
import java.io.EOFException;
import java.net.SocketTimeoutException;
+import java.nio.channels.ClosedChannelException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -55,7 +56,6 @@ import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.NoSpamLogger;
import static com.google.common.collect.Iterables.all;
-import static org.apache.cassandra.net.MessagingService.current_version;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
import static org.apache.cassandra.locator.InetAddressAndPort.hostAddressAndPort;
import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
@@ -364,7 +364,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
getPendingRepair(),
getPreviewKind());
- channel.sendControlMessage(message);
+ channel.sendControlMessage(message).sync();
onInitializationComplete();
}
catch (Exception e)
@@ -535,8 +535,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber
*/
public void state(State newState)
{
- if (logger.isTraceEnabled())
- logger.trace("[Stream #{}] Changing session state from {} to {}", planId(), state, newState);
+ if (logger.isDebugEnabled())
+ logger.debug("[Stream #{}] Changing session state from {} to {}", planId(), state, newState);
sink.recordState(peer, newState);
state = newState;
@@ -626,7 +626,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
prepare.summaries.add(task.getSummary());
}
- channel.sendControlMessage(prepare);
+ channel.sendControlMessage(prepare).syncUninterruptibly();
}
/**
@@ -636,9 +636,10 @@ public class StreamSession implements IEndpointStateChangeSubscriber
*/
public synchronized Future onError(Throwable e)
{
- boolean isEofException = e instanceof EOFException;
+ boolean isEofException = e instanceof EOFException || e instanceof ClosedChannelException;
if (isEofException)
{
+ State state = this.state;
if (state.finalState)
{
logger.debug("[Stream #{}] Socket closed after session completed with state {}", planId(), state);
@@ -659,7 +660,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
logError(e);
// send session failure message
if (channel.connected())
- channel.sendControlMessage(new SessionFailedMessage());
+ channel.sendControlMessage(new SessionFailedMessage()).syncUninterruptibly();
// fail session
return closeSession(State.FAILED);
}
@@ -703,6 +704,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber
});
}
+ public void countStreamedIn(boolean isEntireSSTable)
+ {
+ metrics.countStreamedIn(isEntireSSTable);
+ }
+
/**
* Finish preparing the session. This method is blocking (memtables are flushed in {@link #addTransferRanges}),
* so the logic should not execute on the main IO thread (read: netty event loop).
@@ -718,9 +724,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber
if (!peer.equals(FBUtilities.getBroadcastAddressAndPort()))
for (StreamTransferTask task : transfers.values())
prepareSynAck.summaries.add(task.getSummary());
- channel.sendControlMessage(prepareSynAck);
streamResult.handleSessionPrepared(this);
+ // After sending the message the initiator can close the channel which will cause a ClosedChannelException
+ // in buffer logic, this then gets sent to onError which validates the state isFinalState, if not fails
+ // the session. To avoid a race condition between sending and setting state, make sure to update the state
+ // before sending the message (without closing the channel)
+ // see CASSANDRA-17116
+ if (isPreview())
+ state(State.COMPLETE);
+ channel.sendControlMessage(prepareSynAck).syncUninterruptibly();
if (isPreview())
completePreview();
@@ -737,7 +750,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
// only send the (final) ACK if we are expecting the peer to send this node (the initiator) some files
if (!isPreview())
- channel.sendControlMessage(new PrepareAckMessage());
+ channel.sendControlMessage(new PrepareAckMessage()).syncUninterruptibly();
}
if (isPreview())
@@ -794,7 +807,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
StreamingMetrics.totalIncomingBytes.inc(headerSize);
metrics.incomingBytes.inc(headerSize);
// send back file received message
- channel.sendControlMessage(new ReceivedMessage(message.header.tableId, message.header.sequenceNumber));
+ channel.sendControlMessage(new ReceivedMessage(message.header.tableId, message.header.sequenceNumber)).syncUninterruptibly();
StreamHook.instance.reportIncomingStream(message.header.tableId, message.stream, this, message.header.sequenceNumber);
long receivedStartNanos = nanoTime();
try
@@ -837,14 +850,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber
{
logger.debug("[Stream #{}] handling Complete message, state = {}", planId(), state);
- if (!isFollower)
+ if (!isFollower) // initiator
{
- if (state == State.WAIT_COMPLETE)
- closeSession(State.COMPLETE);
- else
- state(State.WAIT_COMPLETE);
+ initiatorCompleteOrWait();
}
- else
+ else // follower
{
// pre-4.0 nodes should not be connected via streaming, see {@link MessagingService#accept_streaming}
throw new IllegalStateException(String.format("[Stream #%s] Complete message can be only received by the initiator!", planId()));
@@ -864,22 +874,35 @@ public class StreamSession implements IEndpointStateChangeSubscriber
return true;
maybeCompleted = true;
- if (!isFollower)
+ if (!isFollower) // initiator
{
- if (state == State.WAIT_COMPLETE)
- closeSession(State.COMPLETE);
- else
- state(State.WAIT_COMPLETE);
+ initiatorCompleteOrWait();
}
- else
+ else // follower
{
- channel.sendControlMessage(new CompleteMessage());
+ // After sending the message the initiator can close the channel which will cause a ClosedChannelException
+ // in buffer logic, this then gets sent to onError which validates the state isFinalState, if not fails
+ // the session. To avoid a race condition between sending and setting state, make sure to update the state
+ // before sending the message (without closing the channel)
+ // see CASSANDRA-17116
+ state(State.COMPLETE);
+ channel.sendControlMessage(new CompleteMessage()).syncUninterruptibly();
closeSession(State.COMPLETE);
}
return true;
}
+ private void initiatorCompleteOrWait()
+ {
+ // This is called when coordination completes AND when COMPLETE message is seen; it is possible that the
+ // COMPLETE method is seen first!
+ if (state == State.WAIT_COMPLETE)
+ closeSession(State.COMPLETE);
+ else
+ state(State.WAIT_COMPLETE);
+ }
+
/**
* Call back on receiving {@code StreamMessage.Type.SESSION_FAILED} message.
*/
@@ -988,6 +1011,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
{
// pass the session planId/index to the OFM (which is only set at init(), after the transfers have already been created)
ofm.header.addSessionInfo(this);
+ // do not sync here as this does disk access
channel.sendControlMessage(ofm);
}
}
diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
index 3403a24..ff1e61f 100644
--- a/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java
@@ -29,7 +29,6 @@ import org.apache.cassandra.streaming.StreamingDataOutputPlus;
import org.apache.cassandra.streaming.StreamManager;
import org.apache.cassandra.streaming.StreamReceiveException;
import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.utils.JVMStabilityInspector;
public class IncomingStreamMessage extends StreamMessage
{
@@ -54,7 +53,9 @@ public class IncomingStreamMessage extends StreamMessage
}
catch (Throwable t)
{
- JVMStabilityInspector.inspectThrowable(t);
+ if (t instanceof StreamReceiveException)
+ throw (StreamReceiveException) t;
+ // make sure to wrap so the caller always has access to the session to call onError
throw new StreamReceiveException(session, t);
}
}
diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
index f8acb22..1d3c09f 100644
--- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
+++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
@@ -140,15 +140,26 @@ public final class JVMStabilityInspector
if (t instanceof FSError || t instanceof CorruptSSTableException)
isUnstable = true;
- fn.accept(t);
-
// Check for file handle exhaustion
if (t instanceof FileNotFoundException || t instanceof FileSystemException || t instanceof SocketException)
if (t.getMessage() != null && t.getMessage().contains("Too many open files"))
isUnstable = true;
if (isUnstable)
+ {
+ if (!StorageService.instance.isDaemonSetupCompleted())
+ FileUtils.handleStartupFSError(t);
killer.killCurrentJVM(t);
+ }
+
+ try
+ {
+ fn.accept(t);
+ }
+ catch (Exception | Error e)
+ {
+ logger.warn("Unexpected error while handling unexpected error", e);
+ }
if (t.getCause() != null)
inspectThrowable(t.getCause(), fn);
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 753f874..70d9d01 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -50,7 +50,6 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
-
import javax.annotation.concurrent.GuardedBy;
import com.google.common.collect.ImmutableSet;
@@ -82,18 +81,18 @@ import org.apache.cassandra.distributed.api.LogAction;
import org.apache.cassandra.distributed.api.NodeToolResult;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.shared.InstanceClassLoader;
-import org.apache.cassandra.utils.Isolated;
import org.apache.cassandra.distributed.shared.MessageFilters;
import org.apache.cassandra.distributed.shared.Metrics;
import org.apache.cassandra.distributed.shared.NetworkTopology;
-import org.apache.cassandra.utils.Shared;
import org.apache.cassandra.distributed.shared.ShutdownException;
import org.apache.cassandra.distributed.shared.Versions;
import org.apache.cassandra.io.util.PathUtils;
import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Isolated;
+import org.apache.cassandra.utils.Shared;
import org.apache.cassandra.utils.Shared.Recursive;
import org.apache.cassandra.utils.concurrent.Condition;
-import org.apache.cassandra.utils.FBUtilities;
import org.reflections.Reflections;
import org.reflections.scanners.TypeAnnotationsScanner;
import org.reflections.util.ConfigurationBuilder;
@@ -273,12 +272,14 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
public boolean isShutdown()
{
- return isShutdown;
+ IInvokableInstance delegate = this.delegate;
+ // if the instance shuts down on its own, detect that
+ return isShutdown || (delegate != null && delegate.isShutdown());
}
private boolean isRunning()
{
- return !isShutdown;
+ return !isShutdown();
}
@Override
@@ -301,6 +302,9 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
if (isRunning())
throw new IllegalStateException("Can not start a instance that is already running");
isShutdown = false;
+ // if the delegate isn't running, remove so it can be recreated
+ if (delegate != null && delegate.isShutdown())
+ delegate = null;
if (!broadcastAddress.equals(config.broadcastAddress()))
{
// previous address != desired address, so cleanup
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 8ed9083..e5a933d 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -610,7 +610,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
registerInboundFilter(cluster);
registerOutboundFilter(cluster);
- JVMStabilityInspector.replaceKiller(new InstanceKiller());
+ JVMStabilityInspector.replaceKiller(new InstanceKiller(Instance.this::shutdown));
// TODO: this is more than just gossip
StorageService.instance.registerDaemon(CassandraDaemon.getInstanceForTesting());
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceKiller.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceKiller.java
index e7ca49b..38b045b 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceKiller.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceKiller.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.distributed.impl;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -26,6 +27,13 @@ public class InstanceKiller extends JVMStabilityInspector.Killer
{
private static final AtomicLong KILL_ATTEMPTS = new AtomicLong(0);
+ private final Consumer<Boolean> onKill;
+
+ public InstanceKiller(Consumer<Boolean> onKill)
+ {
+ this.onKill = onKill != null ? onKill : ignore -> {};
+ }
+
public static long getKillAttempts()
{
return KILL_ATTEMPTS.get();
@@ -40,6 +48,7 @@ public class InstanceKiller extends JVMStabilityInspector.Killer
protected void killCurrentJVM(Throwable t, boolean quiet)
{
KILL_ATTEMPTS.incrementAndGet();
+ onKill.accept(quiet);
// the bad part is that System.exit kills the JVM, so all code which calls kill won't hit the
// next line; yet in in-JVM dtests System.exit is not desirable, so need to rely on a runtime exception
// as a means to try to stop execution
diff --git a/test/distributed/org/apache/cassandra/distributed/test/FailingRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/FailingRepairTest.java
index 7feefa3..b95f854 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/FailingRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/FailingRepairTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.distributed.test;
import java.io.IOException;
import java.io.Serializable;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -45,8 +46,6 @@ import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.db.DataRange;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.PartitionPosition;
@@ -55,10 +54,12 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.ICluster;
-import org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableRunnable;
import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableRunnable;
import org.apache.cassandra.distributed.impl.InstanceKiller;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.ISSTableScanner;
@@ -72,6 +73,7 @@ import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus;
import org.apache.cassandra.service.StorageService;
+import org.awaitility.Awaitility;
@RunWith(Parameterized.class)
public class FailingRepairTest extends TestBaseImpl implements Serializable
@@ -162,7 +164,12 @@ public class FailingRepairTest extends TestBaseImpl implements Serializable
public void cleanupState()
{
for (int i = 1; i <= CLUSTER.size(); i++)
- CLUSTER.get(i).runOnInstance(InstanceKiller::clear);
+ {
+ IInvokableInstance inst = CLUSTER.get(i);
+ if (inst.isShutdown())
+ inst.startup();
+ inst.runOnInstance(InstanceKiller::clear);
+ }
}
@Test(timeout = 10 * 60 * 1000)
@@ -240,10 +247,7 @@ public class FailingRepairTest extends TestBaseImpl implements Serializable
// its possible that the coordinator gets the message that the replica failed before the replica completes
// shutting down; this then means that isKilled could be updated after the fact
IInvokableInstance replicaInstance = CLUSTER.get(replica);
- while (replicaInstance.killAttempts() <= 0)
- Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
-
- Assert.assertEquals("replica should be killed", 1, replicaInstance.killAttempts());
+ Awaitility.await().atMost(Duration.ofSeconds(30)).until(replicaInstance::isShutdown);
Assert.assertEquals("coordinator should not be killed", 0, CLUSTER.get(coordinator).killAttempts());
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java b/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java
index e42f87c..72fd7f9 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java
@@ -121,7 +121,10 @@ public class StreamingTest extends TestBaseImpl
// verify on follower's stream session
MessageStateSinkImpl followerSink = new MessageStateSinkImpl();
followerSink.messages(initiator, Arrays.asList(STREAM_INIT, PREPARE_SYN, PREPARE_ACK, RECEIVED));
- followerSink.states(initiator, Arrays.asList(PREPARING, STREAMING, StreamSession.State.COMPLETE));
+ // why 2 completes? There is a race condition bug with sending COMPLETE where the socket gets closed
+ // by the initator, which then triggers a ClosedChannelException, which then checks the current state (PREPARING)
+ // to solve this, COMPLETE is set before sending the message, and reset when closing the stream
+ followerSink.states(initiator, Arrays.asList(PREPARING, STREAMING, StreamSession.State.COMPLETE, StreamSession.State.COMPLETE));
followerNode.runOnInstance(() -> StreamSession.sink = followerSink);
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamCloseInMiddleTest.java b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamCloseInMiddleTest.java
new file mode 100644
index 0000000..fc52ab6
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamCloseInMiddleTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.streaming.CassandraIncomingFile;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
+import org.apache.cassandra.io.sstable.format.big.BigTableZeroCopyWriter;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.assertj.core.api.Assertions;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+
+public class StreamCloseInMiddleTest extends TestBaseImpl
+{
+ @Test
+ public void zeroCopy() throws IOException
+ {
+ streamClose(true);
+ }
+
+ @Test
+ public void notZeroCopy() throws IOException
+ {
+ streamClose(false);
+ }
+
+ private void streamClose(boolean zeroCopyStreaming) throws IOException
+ {
+ try (Cluster cluster = Cluster.build(2)
+ .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(3))
+ .withInstanceInitializer(BBHelper::install)
+ .withConfig(c -> c.with(Feature.values())
+ .set("stream_entire_sstables", zeroCopyStreaming)
+ // when die, this will try to halt JVM, which is easier to validate in the test
+ // other levels require checking state of the subsystems
+ .set("disk_failure_policy", "die"))
+ .start())
+ {
+ init(cluster);
+
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int PRIMARY KEY)"));
+
+ triggerStreaming(cluster, zeroCopyStreaming);
+ // make sure disk failure policy is not triggered
+ assertNoNodeShutdown(cluster);
+
+ // now bootstrap a new node; streaming will fail
+ IInvokableInstance node3 = ClusterUtils.addInstance(cluster, cluster.get(1).config(), c -> c.set("auto_bootstrap", true));
+ node3.startup();
+ for (String line : Arrays.asList("Error while waiting on bootstrap to complete. Bootstrap will have to be restarted", // bootstrap failed
+ "Some data streaming failed. Use nodetool to check bootstrap state and resume")) // didn't join ring because bootstrap failed
+ Assertions.assertThat(node3.logs().grep(line).getResult())
+ .hasSize(1);
+
+ assertNoNodeShutdown(cluster);
+ }
+ }
+
+ private void assertNoNodeShutdown(Cluster cluster)
+ {
+ AssertionError t = null;
+ for (IInvokableInstance i : cluster.stream().collect(Collectors.toList()))
+ {
+ try
+ {
+ Assertions.assertThat(i.isShutdown()).describedAs("%s was shutdown; this is not expected", i).isFalse();
+ Assertions.assertThat(i.killAttempts()).describedAs("%s saw kill attempts; this is not expected", i).isEqualTo(0);
+ }
+ catch (AssertionError t2)
+ {
+ if (t == null)
+ t = t2;
+ else
+ t.addSuppressed(t2);
+ }
+ }
+ if (t != null)
+ throw t;
+ }
+
+ private static void triggerStreaming(Cluster cluster, boolean expectedEntireSSTable)
+ {
+ IInvokableInstance node1 = cluster.get(1);
+ IInvokableInstance node2 = cluster.get(2);
+
+ // repair will do streaming IFF there is a mismatch; so cause one
+ for (int i = 0; i < 10; i++)
+ node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk) VALUES (?)"), i); // timestamp won't match, causing a mismatch
+
+ // trigger streaming; expected to fail as streaming socket closed in the middle (currently this is an unrecoverable event)
+ node2.nodetoolResult("repair", "-full", KEYSPACE, "tbl").asserts().failure();
+
+ assertStreamingType(node2, expectedEntireSSTable);
+ }
+
+ private static void assertStreamingType(IInvokableInstance node, boolean expectedEntireSSTable)
+ {
+ String key = "org.apache.cassandra.metrics.Streaming.%s./127.0.0.1.7012";
+ long entire = node.metrics().getCounter(String.format(key, "EntireSSTablesStreamedIn"));
+ long partial = node.metrics().getCounter(String.format(key, "PartialSSTablesStreamedIn"));
+ if (expectedEntireSSTable)
+ {
+ Assertions.assertThat(partial).isEqualTo(0);
+ Assertions.assertThat(entire).isGreaterThan(0);
+ }
+ else
+ {
+ Assertions.assertThat(partial).isGreaterThan(0);
+ Assertions.assertThat(entire).isEqualTo(0);
+ }
+ }
+
+ public static class BBHelper
+ {
+ @SuppressWarnings("unused")
+ public static int writeDirectlyToChannel(ByteBuffer buf, @SuperCall Callable<Integer> zuper) throws Exception
+ {
+ if (isCaller(BigTableZeroCopyWriter.class.getName(), "write"))
+ throw new java.nio.channels.ClosedChannelException();
+ // different context; pass through
+ return zuper.call();
+ }
+
+ @SuppressWarnings("unused")
+ public static boolean append(UnfilteredRowIterator partition, @SuperCall Callable<Boolean> zuper) throws Exception
+ {
+ if (isCaller(CassandraIncomingFile.class.getName(), "read")) // handles compressed and non-compressed
+ throw new java.nio.channels.ClosedChannelException();
+ // different context; pass through
+ return zuper.call();
+ }
+
+ private static boolean isCaller(String klass, String method)
+ {
+ //TODO is there a cleaner way to check this?
+ StackTraceElement[] stack = Thread.currentThread().getStackTrace();
+ for (int i = 0; i < stack.length; i++)
+ {
+ StackTraceElement e = stack[i];
+ if (klass.equals(e.getClassName()) && method.equals(e.getMethodName()))
+ return true;
+ }
+ return false;
+ }
+
+ public static void install(ClassLoader classLoader, Integer num)
+ {
+ new ByteBuddy().rebase(SequentialWriter.class)
+ .method(named("writeDirectlyToChannel").and(takesArguments(1)))
+ .intercept(MethodDelegation.to(BBHelper.class))
+ .make()
+ .load(classLoader, ClassLoadingStrategy.Default.INJECTION);
+
+ new ByteBuddy().rebase(RangeAwareSSTableWriter.class)
+ .method(named("append").and(takesArguments(1)))
+ .intercept(MethodDelegation.to(BBHelper.class))
+ .make()
+ .load(classLoader, ClassLoadingStrategy.Default.INJECTION);
+ }
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeGossipTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeGossipTest.java
index f62936c..35c4fb3 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeGossipTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeGossipTest.java
@@ -51,8 +51,8 @@ public class MixedModeGossipTest extends UpgradeTestBase
.nodes(3)
.nodesToUpgradeOrdered(1, 2, 3)
// all upgrades from v30 up, excluding v30->v3X and from v40
- .singleUpgrade(v30, v40)
- .singleUpgrade(v3X, v40)
+ .singleUpgrade(v30)
+ .singleUpgrade(v3X)
.setup(c -> {})
.runAfterNodeUpgrade((cluster, node) -> {
if (node == 1) {
@@ -87,8 +87,8 @@ public class MixedModeGossipTest extends UpgradeTestBase
.nodes(3)
.nodesToUpgradeOrdered(1, 2, 3)
// all upgrades from v30 up, excluding v30->v3X and from v40
- .singleUpgrade(v30, v40)
- .singleUpgrade(v3X, v40)
+ .singleUpgrade(v30)
+ .singleUpgrade(v3X)
.setup(cluster -> {
// node2 and node3 gossiper cannot talk with each other
cluster.filters().verbs(Verb.GOSSIP_DIGEST_SYN.id).from(2).to(3).drop();
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java
index 72b7f1a..8010853 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java
@@ -83,7 +83,7 @@ public class MixedModeMessageForwardTest extends UpgradeTestBase
.withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK).set("request_timeout_in_ms", 30000))
.withBuilder(b -> b.withRacks(numDCs, 1, nodesPerDc))
.nodes(numDCs * nodesPerDc)
- .singleUpgrade(v30, v40)
+ .singleUpgrade(v30)
.setup(cluster -> {
cluster.schemaChange("ALTER KEYSPACE " + KEYSPACE +
" WITH replication = {'class': 'NetworkTopologyStrategy', " + ntsArgs + " };");
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadTest.java
index 631f65a..b11678d 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReadTest.java
@@ -40,8 +40,8 @@ public class MixedModeReadTest extends UpgradeTestBase
.nodes(2)
.nodesToUpgrade(1)
// all upgrades from v30 up, excluding v30->v3X and from v40
- .singleUpgrade(v30, v40)
- .singleUpgrade(v3X, v40)
+ .singleUpgrade(v30)
+ .singleUpgrade(v3X)
.setup(cluster -> {
cluster.schemaChange(CREATE_TABLE);
insertData(cluster.coordinator(1));
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeRepairTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeRepairTest.java
index 478a1b7..813d9f2 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeRepairTest.java
@@ -54,7 +54,7 @@ public class MixedModeRepairTest extends UpgradeTestBase
new UpgradeTestBase.TestCase()
.nodes(2)
.nodesToUpgrade(UPGRADED_NODE)
- .singleUpgrade(v3X, v40)
+ .singleUpgrade(v3X)
.withConfig(config -> config.with(NETWORK, GOSSIP))
.setup(cluster -> {
cluster.schemaChange(CREATE_TABLE);
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/Pre40MessageFilterTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/Pre40MessageFilterTest.java
index 3e6c9ca..4cca7b9 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/Pre40MessageFilterTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/Pre40MessageFilterTest.java
@@ -35,7 +35,7 @@ public class Pre40MessageFilterTest extends UpgradeTestBase
.withConfig(configConsumer)
.nodesToUpgrade(1)
// all upgrades from v30 up, excluding v30->v3X
- .singleUpgrade(v30, v40)
+ .singleUpgrade(v30)
.upgradesFrom(v3X)
.setup((cluster) -> {
cluster.filters().outbound().allVerbs().messagesMatching((f,t,m) -> false).drop();
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
index 768c6cc..ce0d8e9 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
@@ -32,6 +32,9 @@ import com.vdurmont.semver4j.Semver.SemverType;
import org.junit.After;
import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.distributed.UpgradeableCluster;
@@ -41,6 +44,7 @@ import org.apache.cassandra.distributed.shared.DistributedTestBase;
import org.apache.cassandra.distributed.shared.ThrowingRunnable;
import org.apache.cassandra.distributed.shared.Versions;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import static org.apache.cassandra.distributed.shared.Versions.Version;
@@ -50,6 +54,8 @@ import static org.apache.cassandra.distributed.shared.Versions.find;
public class UpgradeTestBase extends DistributedTestBase
{
+ private static final Logger logger = LoggerFactory.getLogger(UpgradeTestBase.class);
+
@After
public void afterEach()
{
@@ -157,9 +163,9 @@ public class UpgradeTestBase extends DistributedTestBase
}
/** Will test this specific upgrade path **/
- public TestCase singleUpgrade(Semver from, Semver to)
+ public TestCase singleUpgrade(Semver from)
{
- this.upgrade.add(new TestVersions(versions.getLatest(from), versions.getLatest(to)));
+ this.upgrade.add(new TestVersions(versions.getLatest(from), versions.getLatest(CURRENT)));
return this;
}
@@ -219,7 +225,7 @@ public class UpgradeTestBase extends DistributedTestBase
for (TestVersions upgrade : this.upgrade)
{
- System.out.printf("testing upgrade from %s to %s%n", upgrade.initial.version, upgrade.upgrade.version);
+ logger.info("testing upgrade from {} to {}", upgrade.initial.version, upgrade.upgrade.version);
try (UpgradeableCluster cluster = init(UpgradeableCluster.create(nodeCount, upgrade.initial, configConsumer, builderConsumer)))
{
setup.run(cluster);
diff --git a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
index d9bbf10..84c6415 100644
--- a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
+++ b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
@@ -241,7 +241,7 @@ public class ZeroCopyStreamingBenchmark
@Benchmark
@BenchmarkMode(Mode.Throughput)
- public void blockStreamReader(BenchmarkState state) throws Exception
+ public void blockStreamReader(BenchmarkState state) throws Throwable
{
EmbeddedChannel channel = createMockNettyChannel();
AsyncStreamingInputPlus in = new AsyncStreamingInputPlus(channel);
@@ -265,7 +265,7 @@ public class ZeroCopyStreamingBenchmark
@Benchmark
@BenchmarkMode(Mode.Throughput)
- public void partialStreamReader(BenchmarkState state) throws Exception
+ public void partialStreamReader(BenchmarkState state) throws Throwable
{
EmbeddedChannel channel = createMockNettyChannel();
AsyncStreamingInputPlus in = new AsyncStreamingInputPlus(channel);
diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
index bf7c148..c3ffe1f 100644
--- a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
+++ b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
@@ -129,7 +129,7 @@ public class CassandraEntireSSTableStreamWriterTest
}
@Test
- public void testBlockReadingAndWritingOverWire() throws Exception
+ public void testBlockReadingAndWritingOverWire() throws Throwable
{
StreamSession session = setupStreamingSessionForTest();
InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
diff --git a/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java b/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java
index 4d2242b..edd2c24 100644
--- a/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java
+++ b/test/unit/org/apache/cassandra/db/streaming/EntireSSTableStreamConcurrentComponentMutationTest.java
@@ -160,7 +160,7 @@ public class EntireSSTableStreamConcurrentComponentMutationTest
}
@Test
- public void testStream() throws Exception
+ public void testStream() throws Throwable
{
testStreamWithConcurrentComponentMutation(NO_OP, NO_OP);
}
@@ -170,7 +170,7 @@ public class EntireSSTableStreamConcurrentComponentMutationTest
* update causes the actual transfered file size to be different from the one in {@link ComponentManifest}
*/
@Test
- public void testStreamWithStatsMutation() throws Exception
+ public void testStreamWithStatsMutation() throws Throwable
{
testStreamWithConcurrentComponentMutation(() -> {
@@ -188,7 +188,7 @@ public class EntireSSTableStreamConcurrentComponentMutationTest
targetLocation = "AFTER INVOKE serialize",
condition = "$descriptor.cfname.contains(\"Standard1\")",
action = "org.apache.cassandra.db.streaming.EntireSSTableStreamConcurrentComponentMutationTest.countDown();Thread.sleep(5000);")
- public void testStreamWithIndexSummaryRedistributionDelaySavingSummary() throws Exception
+ public void testStreamWithIndexSummaryRedistributionDelaySavingSummary() throws Throwable
{
testStreamWithConcurrentComponentMutation(() -> {
// wait until new index summary is partially written
@@ -203,7 +203,7 @@ public class EntireSSTableStreamConcurrentComponentMutationTest
latch.countDown();
}
- private void testStreamWithConcurrentComponentMutation(Callable<?> runBeforeStreaming, Callable<?> runConcurrentWithStreaming) throws Exception
+ private void testStreamWithConcurrentComponentMutation(Callable<?> runBeforeStreaming, Callable<?> runConcurrentWithStreaming) throws Throwable
{
ByteBuf serializedFile = Unpooled.buffer(8192);
InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
diff --git a/test/unit/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriterTest.java
index d3aed25..ddd3a13 100644
--- a/test/unit/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriterTest.java
@@ -19,7 +19,9 @@
package org.apache.cassandra.io.sstable.format.big;
import java.io.ByteArrayInputStream;
+import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collection;
@@ -161,7 +163,14 @@ public class BigTableZeroCopyWriterTest
{
Pair<DataInputPlus, Long> pair = getSSTableComponentData(sstable, component, bufferMapper);
- btzcw.writeComponent(component.type, pair.left, pair.right);
+ try
+ {
+ btzcw.writeComponent(component.type, pair.left, pair.right);
+ }
+ catch (ClosedChannelException e)
+ {
+ throw new UncheckedIOException(e);
+ }
}
}
diff --git a/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java b/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java
index f86be38..3a3415e 100644
--- a/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java
+++ b/test/unit/org/apache/cassandra/utils/JVMStabilityInspectorTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.utils;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.SocketException;
+import java.util.Arrays;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -30,6 +31,10 @@ import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.assertj.core.api.Assertions;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.CassandraDaemon;
+import org.apache.cassandra.service.DefaultFSErrorHandler;
+import org.apache.cassandra.service.StorageService;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
@@ -54,43 +59,62 @@ public class JVMStabilityInspectorTest
Config.DiskFailurePolicy oldPolicy = DatabaseDescriptor.getDiskFailurePolicy();
Config.CommitFailurePolicy oldCommitPolicy = DatabaseDescriptor.getCommitFailurePolicy();
+ FileUtils.setFSErrorHandler(new DefaultFSErrorHandler());
try
{
- killerForTests.reset();
- JVMStabilityInspector.inspectThrowable(new IOException());
- assertFalse(killerForTests.wasKilled());
-
- DatabaseDescriptor.setDiskFailurePolicy(Config.DiskFailurePolicy.die);
- killerForTests.reset();
- JVMStabilityInspector.inspectThrowable(new FSReadError(new IOException(), "blah"));
- assertTrue(killerForTests.wasKilled());
-
- killerForTests.reset();
- JVMStabilityInspector.inspectThrowable(new FSWriteError(new IOException(), "blah"));
- assertTrue(killerForTests.wasKilled());
-
- killerForTests.reset();
- JVMStabilityInspector.inspectThrowable(new CorruptSSTableException(new IOException(), "blah"));
- assertTrue(killerForTests.wasKilled());
-
- killerForTests.reset();
- JVMStabilityInspector.inspectThrowable(new RuntimeException(new CorruptSSTableException(new IOException(), "blah")));
- assertTrue(killerForTests.wasKilled());
-
- DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.die);
- killerForTests.reset();
- JVMStabilityInspector.inspectCommitLogThrowable(new Throwable());
- assertTrue(killerForTests.wasKilled());
-
- killerForTests.reset();
- JVMStabilityInspector.inspectThrowable(new Exception(new IOException()));
- assertFalse(killerForTests.wasKilled());
+ CassandraDaemon daemon = new CassandraDaemon();
+ daemon.completeSetup();
+ for (boolean daemonSetupCompleted : Arrays.asList(false, true))
+ {
+ // disk policy acts differently depending on if setup is complete or not; which is defined by
+ // the daemon thread not being null
+ StorageService.instance.registerDaemon(daemonSetupCompleted ? daemon : null);
+
+ try
+ {
+ killerForTests.reset();
+ JVMStabilityInspector.inspectThrowable(new IOException());
+ assertFalse(killerForTests.wasKilled());
+
+ DatabaseDescriptor.setDiskFailurePolicy(Config.DiskFailurePolicy.die);
+ killerForTests.reset();
+ JVMStabilityInspector.inspectThrowable(new FSReadError(new IOException(), "blah"));
+ assertTrue(killerForTests.wasKilled());
+
+ killerForTests.reset();
+ JVMStabilityInspector.inspectThrowable(new FSWriteError(new IOException(), "blah"));
+ assertTrue(killerForTests.wasKilled());
+
+ killerForTests.reset();
+ JVMStabilityInspector.inspectThrowable(new CorruptSSTableException(new IOException(), "blah"));
+ assertTrue(killerForTests.wasKilled());
+
+ killerForTests.reset();
+ JVMStabilityInspector.inspectThrowable(new RuntimeException(new CorruptSSTableException(new IOException(), "blah")));
+ assertTrue(killerForTests.wasKilled());
+
+ DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.die);
+ killerForTests.reset();
+ JVMStabilityInspector.inspectCommitLogThrowable(new Throwable());
+ assertTrue(killerForTests.wasKilled());
+
+ killerForTests.reset();
+ JVMStabilityInspector.inspectThrowable(new Exception(new IOException()));
+ assertFalse(killerForTests.wasKilled());
+ }
+ catch (Exception | Error e)
+ {
+ throw new AssertionError("Failure when daemonSetupCompleted=" + daemonSetupCompleted, e);
+ }
+ }
}
finally
{
JVMStabilityInspector.replaceKiller(originalKiller);
DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
DatabaseDescriptor.setCommitFailurePolicy(oldCommitPolicy);
+ StorageService.instance.registerDaemon(null);
+ FileUtils.setFSErrorHandler(null);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org