You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ms...@apache.org on 2016/08/16 22:42:04 UTC
[03/50] [abbrv] cassandra git commit: Fix hanging stream session
Fix hanging stream session
by preventing CompressedStreamReader from blocking on IOException.
Also removed retry support from streaming.
Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-10992
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/76e3100f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/76e3100f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/76e3100f
Branch: refs/heads/cassandra-3.8
Commit: 76e3100ffb106cab3cc665404e293c1026e5e65c
Parents: bc9af92
Author: Paulo Motta <pa...@gmail.com>
Authored: Thu Jun 23 11:33:54 2016 -0300
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Aug 9 16:31:34 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/config/Config.java | 4 ++
.../cassandra/config/DatabaseDescriptor.java | 5 --
.../cassandra/streaming/StreamReader.java | 26 +----------
.../cassandra/streaming/StreamSession.java | 36 +-------------
.../compress/CompressedInputStream.java | 21 ++++++++-
.../compress/CompressedStreamReader.java | 10 ++--
.../streaming/messages/IncomingFileMessage.java | 22 ++-------
.../streaming/messages/RetryMessage.java | 4 ++
.../org/apache/cassandra/utils/Throwables.java | 17 +++++++
.../compress/CompressedInputStreamTest.java | 49 +++++++++++++++++---
11 files changed, 102 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f734476..232203e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.8
+ * Fix hanging stream session (CASSANDRA-10992)
* Add byteman support for testing (CASSANDRA-12377)
* Fix INSERT JSON, fromJson() support of smallint, tinyint types (CASSANDRA-12371)
* Restore JVM metric export for metric reporters (CASSANDRA-12312)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index ede4560..60daee6 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -171,6 +171,10 @@ public class Config
public volatile Integer compaction_throughput_mb_per_sec = 16;
public volatile Integer compaction_large_partition_warning_threshold_mb = 100;
+ /**
+ * @deprecated retry support removed on CASSANDRA-10992
+ */
+ @Deprecated
public Integer max_streaming_retries = 3;
public volatile Integer stream_throughput_outbound_megabits_per_sec = 200;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index f1acfc4..6e46725 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -957,11 +957,6 @@ public class DatabaseDescriptor
return conf.cluster_name;
}
- public static int getMaxStreamingRetries()
- {
- return conf.max_streaming_retries;
- }
-
public static int getStoragePort()
{
return Integer.parseInt(System.getProperty("cassandra.storage_port", conf.storage_port.toString()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 8789720..c96ea22 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -45,6 +45,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.BytesReadTracker;
import org.apache.cassandra.utils.Pair;
+import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause;
/**
* StreamReader reads from stream and writes to SSTable.
@@ -137,11 +138,7 @@ public class StreamReader
e.addSuppressed(e2);
}
}
- drain(dis, in.getBytesRead());
- if (e instanceof IOException)
- throw (IOException) e;
- else
- throw Throwables.propagate(e);
+ throw Throwables.propagate(e);
}
}
@@ -155,25 +152,6 @@ public class StreamReader
return SSTableWriter.create(desc, estimatedKeys, repairedAt, sstableLevel);
}
- protected void drain(InputStream dis, long bytesRead) throws IOException
- {
- long toSkip = totalSize() - bytesRead;
-
- // InputStream.skip can return -1 if dis is inaccessible.
- long skipped = dis.skip(toSkip);
- if (skipped == -1)
- return;
-
- toSkip = toSkip - skipped;
- while (toSkip > 0)
- {
- skipped = dis.skip(toSkip);
- if (skipped == -1)
- break;
- toSkip = toSkip - skipped;
- }
- }
-
protected long totalSize()
{
long size = 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 294b9c1..0f43f1f 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -105,10 +105,8 @@ import org.apache.cassandra.utils.concurrent.Refs;
* complete (received()). When all files for the StreamReceiveTask have been received, the sstables
* are added to the CFS (and 2ndary index are built, StreamReceiveTask.complete()) and the task
* is marked complete (taskCompleted())
- * (b) If during the streaming of a particular file an I/O error occurs on the receiving end of a stream
- * (FileMessage.deserialize), the node will retry the file (up to DatabaseDescriptor.getMaxStreamingRetries())
- * by sending a RetryMessage to the sender. On receiving a RetryMessage, the sender simply issue a new
- * FileMessage for that file.
+ * (b) If during the streaming of a particular file an error occurs on the receiving end of a stream
+ * (FileMessage.deserialize), the node will send a SessionFailedMessage to the sender and close the stream session.
* (c) When all transfer and receive tasks for a session are complete, the move to the Completion phase
* (maybeCompleted()).
*
@@ -149,8 +147,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber
public final ConnectionHandler handler;
- private int retries;
-
private AtomicBoolean isAborted = new AtomicBoolean(false);
private final boolean keepSSTableLevel;
private final boolean isIncremental;
@@ -481,11 +477,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber
received(received.cfId, received.sequenceNumber);
break;
- case RETRY:
- RetryMessage retry = (RetryMessage) message;
- retry(retry.cfId, retry.sequenceNumber);
- break;
-
case COMPLETE:
complete();
break;
@@ -610,18 +601,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber
}
/**
- * Call back on receiving {@code StreamMessage.Type.RETRY} message.
- *
- * @param cfId ColumnFamily ID
- * @param sequenceNumber Sequence number to indicate which file to stream again
- */
- public void retry(UUID cfId, int sequenceNumber)
- {
- OutgoingFileMessage message = transfers.get(cfId).createMessageForRetry(sequenceNumber);
- handler.sendMessage(message);
- }
-
- /**
* Check if session is completed on receiving {@code StreamMessage.Type.COMPLETE} message.
*/
public synchronized void complete()
@@ -651,17 +630,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber
closeSession(State.FAILED);
}
- public void doRetry(FileMessageHeader header, Throwable e)
- {
- logger.warn("[Stream #{}] Retrying for following error", planId(), e);
- // retry
- retries++;
- if (retries > DatabaseDescriptor.getMaxStreamingRetries())
- onError(new IOException("Too many retries for " + header, e));
- else
- handler.sendMessage(new RetryMessage(header.cfId, header.sequenceNumber));
- }
-
/**
* @return Current snapshot of this session info.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 489fed9..d08ffa9 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -63,8 +63,17 @@ public class CompressedInputStream extends InputStream
// raw checksum bytes
private final byte[] checksumBytes = new byte[4];
+ /**
+ * Indicates there was a problem when reading from source stream.
+ * When this is added to the <code>dataBuffer</code> by the stream Reader,
+ * it is expected that the <code>readException</code> variable is populated
+ * with the cause of the error when reading from source stream, so it is
+ * thrown to the consumer on subsequent read operation.
+ */
private static final byte[] POISON_PILL = new byte[0];
+ protected volatile IOException readException = null;
+
private long totalCompressedBytesRead;
/**
@@ -84,13 +93,19 @@ public class CompressedInputStream extends InputStream
public int read() throws IOException
{
+ if (readException != null)
+ throw readException;
+
if (current >= bufferOffset + buffer.length || validBufferBytes == -1)
{
try
{
byte[] compressedWithCRC = dataBuffer.take();
if (compressedWithCRC == POISON_PILL)
- throw new EOFException("No chunk available");
+ {
+ assert readException != null;
+ throw readException;
+ }
decompress(compressedWithCRC);
}
catch (InterruptedException e)
@@ -138,7 +153,7 @@ public class CompressedInputStream extends InputStream
return totalCompressedBytesRead;
}
- static class Reader extends WrappedRunnable
+ class Reader extends WrappedRunnable
{
private final InputStream source;
private final Iterator<CompressionMetadata.Chunk> chunks;
@@ -169,6 +184,7 @@ public class CompressedInputStream extends InputStream
int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
if (r < 0)
{
+ readException = new EOFException("No chunk available");
dataBuffer.put(POISON_PILL);
return; // throw exception where we consume dataBuffer
}
@@ -177,6 +193,7 @@ public class CompressedInputStream extends InputStream
catch (IOException e)
{
logger.warn("Error while reading compressed input stream.", e);
+ readException = e;
dataBuffer.put(POISON_PILL);
return; // throw exception where we consume dataBuffer
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index c684e4f..fa1022d 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -44,6 +44,8 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.BytesReadTracker;
import org.apache.cassandra.utils.Pair;
+import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause;
+
/**
* StreamReader that reads from streamed compressed SSTable
*/
@@ -132,11 +134,9 @@ public class CompressedStreamReader extends StreamReader
e.addSuppressed(e2);
}
}
- drain(cis, in.getBytesRead());
- if (e instanceof IOException)
- throw (IOException) e;
- else
- throw Throwables.propagate(e);
+ if (extractIOExceptionCause(e).isPresent())
+ throw e;
+ throw Throwables.propagate(e);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
index 31ab2a8..2870c03 100644
--- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
+import com.google.common.base.Optional;
+
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.streaming.StreamReader;
@@ -29,6 +31,8 @@ import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.compress.CompressedStreamReader;
import org.apache.cassandra.utils.JVMStabilityInspector;
+import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause;
+
/**
* IncomingFileMessage is used to receive the part(or whole) of a SSTable data file.
*/
@@ -48,26 +52,10 @@ public class IncomingFileMessage extends StreamMessage
{
return new IncomingFileMessage(reader.read(in), header);
}
- catch (IOException eof)
- {
- // Reading from remote failed(i.e. reached EOF before reading expected length of data).
- // This can be caused by network/node failure thus we are not retrying
- throw eof;
- }
catch (Throwable t)
{
- // Throwable can be Runtime error containing IOException.
- // In that case we don't want to retry.
- Throwable cause = t;
- while ((cause = cause.getCause()) != null)
- {
- if (cause instanceof IOException)
- throw (IOException) cause;
- }
JVMStabilityInspector.inspectThrowable(t);
- // Otherwise, we can retry
- session.doRetry(header, t);
- return null;
+ throw t;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
index 29e84bf..6673aa1 100644
--- a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
@@ -27,6 +27,10 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.utils.UUIDSerializer;
+/**
+ * @deprecated retry support removed on CASSANDRA-10992
+ */
+@Deprecated
public class RetryMessage extends StreamMessage
{
public static Serializer<RetryMessage> serializer = new Serializer<RetryMessage>()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/utils/Throwables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Throwables.java b/src/java/org/apache/cassandra/utils/Throwables.java
index a895f31..877f388 100644
--- a/src/java/org/apache/cassandra/utils/Throwables.java
+++ b/src/java/org/apache/cassandra/utils/Throwables.java
@@ -18,6 +18,10 @@
*/
package org.apache.cassandra.utils;
+import java.io.IOException;
+
+import com.google.common.base.Optional;
+
public class Throwables
{
@@ -50,4 +54,17 @@ public class Throwables
}
return accumulate;
}
+
+ public static Optional<IOException> extractIOExceptionCause(Throwable t)
+ {
+ if (t instanceof IOException)
+ return Optional.of((IOException) t);
+ Throwable cause = t;
+ while ((cause = cause.getCause()) != null)
+ {
+ if (cause instanceof IOException)
+ return Optional.of((IOException) cause);
+ }
+ return Optional.absent();
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 0becd18..87d93fd 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.utils.Pair;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
/**
*/
@@ -44,24 +45,33 @@ public class CompressedInputStreamTest
@Test
public void testCompressedRead() throws Exception
{
- testCompressedReadWith(new long[]{0L}, false);
- testCompressedReadWith(new long[]{1L}, false);
- testCompressedReadWith(new long[]{100L}, false);
+ testCompressedReadWith(new long[]{0L}, false, false);
+ testCompressedReadWith(new long[]{1L}, false, false);
+ testCompressedReadWith(new long[]{100L}, false, false);
- testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false);
+ testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false, false);
}
@Test(expected = EOFException.class)
public void testTruncatedRead() throws Exception
{
- testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
+ testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true, false);
+ }
+
+ /**
+ * Test that CompressedInputStream does not block if there's an exception while reading stream
+ */
+ @Test(timeout = 30000)
+ public void testException() throws Exception
+ {
+ testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false, true);
}
/**
* @param valuesToCheck array of longs of range(0-999)
* @throws Exception
*/
- private void testCompressedReadWith(long[] valuesToCheck, boolean testTruncate) throws Exception
+ private void testCompressedReadWith(long[] valuesToCheck, boolean testTruncate, boolean testException) throws Exception
{
assert valuesToCheck != null && valuesToCheck.length > 0;
@@ -120,6 +130,12 @@ public class CompressedInputStreamTest
// read buffer using CompressedInputStream
CompressionInfo info = new CompressionInfo(chunks, param);
+
+ if (testException)
+ {
+ testException(sections, info);
+ return;
+ }
CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info);
try (DataInputStream in = new DataInputStream(input))
@@ -132,4 +148,25 @@ public class CompressedInputStreamTest
}
}
}
+
+ private static void testException(List<Pair<Long, Long>> sections, CompressionInfo info) throws IOException
+ {
+ CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(new byte[0]), info);
+
+ try (DataInputStream in = new DataInputStream(input))
+ {
+ for (int i = 0; i < sections.size(); i++)
+ {
+ input.position(sections.get(i).left);
+ try {
+ in.readLong();
+ fail("Should have thrown IOException");
+ }
+ catch (IOException e)
+ {
+ continue;
+ }
+ }
+ }
+ }
}