You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ms...@apache.org on 2016/08/16 22:42:04 UTC

[03/50] [abbrv] cassandra git commit: Fix hanging stream session

Fix hanging stream session

by preventing CompressedStreamReader from blocking on IOException.
Also removed retry support from streaming.

Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-10992


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/76e3100f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/76e3100f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/76e3100f

Branch: refs/heads/cassandra-3.8
Commit: 76e3100ffb106cab3cc665404e293c1026e5e65c
Parents: bc9af92
Author: Paulo Motta <pa...@gmail.com>
Authored: Thu Jun 23 11:33:54 2016 -0300
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Aug 9 16:31:34 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/config/Config.java     |  4 ++
 .../cassandra/config/DatabaseDescriptor.java    |  5 --
 .../cassandra/streaming/StreamReader.java       | 26 +----------
 .../cassandra/streaming/StreamSession.java      | 36 +-------------
 .../compress/CompressedInputStream.java         | 21 ++++++++-
 .../compress/CompressedStreamReader.java        | 10 ++--
 .../streaming/messages/IncomingFileMessage.java | 22 ++-------
 .../streaming/messages/RetryMessage.java        |  4 ++
 .../org/apache/cassandra/utils/Throwables.java  | 17 +++++++
 .../compress/CompressedInputStreamTest.java     | 49 +++++++++++++++++---
 11 files changed, 102 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f734476..232203e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.8
+ * Fix hanging stream session (CASSANDRA-10992)
  * Add byteman support for testing (CASSANDRA-12377)
  * Fix INSERT JSON, fromJson() support of smallint, tinyint types (CASSANDRA-12371)
  * Restore JVM metric export for metric reporters (CASSANDRA-12312)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index ede4560..60daee6 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -171,6 +171,10 @@ public class Config
     public volatile Integer compaction_throughput_mb_per_sec = 16;
     public volatile Integer compaction_large_partition_warning_threshold_mb = 100;
 
+    /**
+     * @deprecated retry support removed on CASSANDRA-10992
+     */
+    @Deprecated
     public Integer max_streaming_retries = 3;
 
     public volatile Integer stream_throughput_outbound_megabits_per_sec = 200;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index f1acfc4..6e46725 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -957,11 +957,6 @@ public class DatabaseDescriptor
         return conf.cluster_name;
     }
 
-    public static int getMaxStreamingRetries()
-    {
-        return conf.max_streaming_retries;
-    }
-
     public static int getStoragePort()
     {
         return Integer.parseInt(System.getProperty("cassandra.storage_port", conf.storage_port.toString()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 8789720..c96ea22 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -45,6 +45,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.BytesReadTracker;
 import org.apache.cassandra.utils.Pair;
 
+import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause;
 
 /**
  * StreamReader reads from stream and writes to SSTable.
@@ -137,11 +138,7 @@ public class StreamReader
                     e.addSuppressed(e2);
                 }
             }
-            drain(dis, in.getBytesRead());
-            if (e instanceof IOException)
-                throw (IOException) e;
-            else
-                throw Throwables.propagate(e);
+            throw Throwables.propagate(e);
         }
     }
 
@@ -155,25 +152,6 @@ public class StreamReader
         return SSTableWriter.create(desc, estimatedKeys, repairedAt, sstableLevel);
     }
 
-    protected void drain(InputStream dis, long bytesRead) throws IOException
-    {
-        long toSkip = totalSize() - bytesRead;
-
-        // InputStream.skip can return -1 if dis is inaccessible.
-        long skipped = dis.skip(toSkip);
-        if (skipped == -1)
-            return;
-
-        toSkip = toSkip - skipped;
-        while (toSkip > 0)
-        {
-            skipped = dis.skip(toSkip);
-            if (skipped == -1)
-                break;
-            toSkip = toSkip - skipped;
-        }
-    }
-
     protected long totalSize()
     {
         long size = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 294b9c1..0f43f1f 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -105,10 +105,8 @@ import org.apache.cassandra.utils.concurrent.Refs;
  *       complete (received()). When all files for the StreamReceiveTask have been received, the sstables
  *       are added to the CFS (and 2ndary index are built, StreamReceiveTask.complete()) and the task
  *       is marked complete (taskCompleted())
- *   (b) If during the streaming of a particular file an I/O error occurs on the receiving end of a stream
- *       (FileMessage.deserialize), the node will retry the file (up to DatabaseDescriptor.getMaxStreamingRetries())
- *       by sending a RetryMessage to the sender. On receiving a RetryMessage, the sender simply issue a new
- *       FileMessage for that file.
+ *   (b) If during the streaming of a particular file an error occurs on the receiving end of a stream
+ *       (FileMessage.deserialize), the node will send a SessionFailedMessage to the sender and close the stream session.
  *   (c) When all transfer and receive tasks for a session are complete, the move to the Completion phase
  *       (maybeCompleted()).
  *
@@ -149,8 +147,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber
 
     public final ConnectionHandler handler;
 
-    private int retries;
-
     private AtomicBoolean isAborted = new AtomicBoolean(false);
     private final boolean keepSSTableLevel;
     private final boolean isIncremental;
@@ -481,11 +477,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber
                 received(received.cfId, received.sequenceNumber);
                 break;
 
-            case RETRY:
-                RetryMessage retry = (RetryMessage) message;
-                retry(retry.cfId, retry.sequenceNumber);
-                break;
-
             case COMPLETE:
                 complete();
                 break;
@@ -610,18 +601,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     }
 
     /**
-     * Call back on receiving {@code StreamMessage.Type.RETRY} message.
-     *
-     * @param cfId ColumnFamily ID
-     * @param sequenceNumber Sequence number to indicate which file to stream again
-     */
-    public void retry(UUID cfId, int sequenceNumber)
-    {
-        OutgoingFileMessage message = transfers.get(cfId).createMessageForRetry(sequenceNumber);
-        handler.sendMessage(message);
-    }
-
-    /**
      * Check if session is completed on receiving {@code StreamMessage.Type.COMPLETE} message.
      */
     public synchronized void complete()
@@ -651,17 +630,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         closeSession(State.FAILED);
     }
 
-    public void doRetry(FileMessageHeader header, Throwable e)
-    {
-        logger.warn("[Stream #{}] Retrying for following error", planId(), e);
-        // retry
-        retries++;
-        if (retries > DatabaseDescriptor.getMaxStreamingRetries())
-            onError(new IOException("Too many retries for " + header, e));
-        else
-            handler.sendMessage(new RetryMessage(header.cfId, header.sequenceNumber));
-    }
-
     /**
      * @return Current snapshot of this session info.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 489fed9..d08ffa9 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -63,8 +63,17 @@ public class CompressedInputStream extends InputStream
     // raw checksum bytes
     private final byte[] checksumBytes = new byte[4];
 
+    /**
+     * Indicates there was a problem when reading from source stream.
+     * When this is added to the <code>dataBuffer</code> by the stream Reader,
+     * it is expected that the <code>readException</code> variable is populated
+     * with the cause of the error when reading from source stream, so it is
+     * thrown to the consumer on subsequent read operation.
+     */
     private static final byte[] POISON_PILL = new byte[0];
 
+    protected volatile IOException readException = null;
+
     private long totalCompressedBytesRead;
 
     /**
@@ -84,13 +93,19 @@ public class CompressedInputStream extends InputStream
 
     public int read() throws IOException
     {
+        if (readException != null)
+            throw readException;
+
         if (current >= bufferOffset + buffer.length || validBufferBytes == -1)
         {
             try
             {
                 byte[] compressedWithCRC = dataBuffer.take();
                 if (compressedWithCRC == POISON_PILL)
-                    throw new EOFException("No chunk available");
+                {
+                    assert readException != null;
+                    throw readException;
+                }
                 decompress(compressedWithCRC);
             }
             catch (InterruptedException e)
@@ -138,7 +153,7 @@ public class CompressedInputStream extends InputStream
         return totalCompressedBytesRead;
     }
 
-    static class Reader extends WrappedRunnable
+    class Reader extends WrappedRunnable
     {
         private final InputStream source;
         private final Iterator<CompressionMetadata.Chunk> chunks;
@@ -169,6 +184,7 @@ public class CompressedInputStream extends InputStream
                         int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
                         if (r < 0)
                         {
+                            readException = new EOFException("No chunk available");
                             dataBuffer.put(POISON_PILL);
                             return; // throw exception where we consume dataBuffer
                         }
@@ -177,6 +193,7 @@ public class CompressedInputStream extends InputStream
                     catch (IOException e)
                     {
                         logger.warn("Error while reading compressed input stream.", e);
+                        readException = e;
                         dataBuffer.put(POISON_PILL);
                         return; // throw exception where we consume dataBuffer
                     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index c684e4f..fa1022d 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -44,6 +44,8 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.BytesReadTracker;
 import org.apache.cassandra.utils.Pair;
 
+import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause;
+
 /**
  * StreamReader that reads from streamed compressed SSTable
  */
@@ -132,11 +134,9 @@ public class CompressedStreamReader extends StreamReader
                     e.addSuppressed(e2);
                 }
             }
-            drain(cis, in.getBytesRead());
-            if (e instanceof IOException)
-                throw (IOException) e;
-            else
-                throw Throwables.propagate(e);
+            if (extractIOExceptionCause(e).isPresent())
+                throw e;
+            throw Throwables.propagate(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
index 31ab2a8..2870c03 100644
--- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 
+import com.google.common.base.Optional;
+
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.streaming.StreamReader;
@@ -29,6 +31,8 @@ import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.compress.CompressedStreamReader;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
+import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause;
+
 /**
  * IncomingFileMessage is used to receive the part(or whole) of a SSTable data file.
  */
@@ -48,26 +52,10 @@ public class IncomingFileMessage extends StreamMessage
             {
                 return new IncomingFileMessage(reader.read(in), header);
             }
-            catch (IOException eof)
-            {
-                // Reading from remote failed(i.e. reached EOF before reading expected length of data).
-                // This can be caused by network/node failure thus we are not retrying
-                throw eof;
-            }
             catch (Throwable t)
             {
-                // Throwable can be Runtime error containing IOException.
-                // In that case we don't want to retry.
-                Throwable cause = t;
-                while ((cause = cause.getCause()) != null)
-                {
-                   if (cause instanceof IOException)
-                       throw (IOException) cause;
-                }
                 JVMStabilityInspector.inspectThrowable(t);
-                // Otherwise, we can retry
-                session.doRetry(header, t);
-                return null;
+                throw t;
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
index 29e84bf..6673aa1 100644
--- a/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/RetryMessage.java
@@ -27,6 +27,10 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.utils.UUIDSerializer;
 
+/**
+ * @deprecated retry support removed on CASSANDRA-10992
+ */
+@Deprecated
 public class RetryMessage extends StreamMessage
 {
     public static Serializer<RetryMessage> serializer = new Serializer<RetryMessage>()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/src/java/org/apache/cassandra/utils/Throwables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Throwables.java b/src/java/org/apache/cassandra/utils/Throwables.java
index a895f31..877f388 100644
--- a/src/java/org/apache/cassandra/utils/Throwables.java
+++ b/src/java/org/apache/cassandra/utils/Throwables.java
@@ -18,6 +18,10 @@
 */
 package org.apache.cassandra.utils;
 
+import java.io.IOException;
+
+import com.google.common.base.Optional;
+
 public class Throwables
 {
 
@@ -50,4 +54,17 @@ public class Throwables
         }
         return accumulate;
     }
+
+    public static Optional<IOException> extractIOExceptionCause(Throwable t)
+    {
+        if (t instanceof IOException)
+            return Optional.of((IOException) t);
+        Throwable cause = t;
+        while ((cause = cause.getCause()) != null)
+        {
+            if (cause instanceof IOException)
+                return Optional.of((IOException) cause);
+        }
+        return Optional.absent();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/76e3100f/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 0becd18..87d93fd 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.Pair;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 /**
  */
@@ -44,24 +45,33 @@ public class CompressedInputStreamTest
     @Test
     public void testCompressedRead() throws Exception
     {
-        testCompressedReadWith(new long[]{0L}, false);
-        testCompressedReadWith(new long[]{1L}, false);
-        testCompressedReadWith(new long[]{100L}, false);
+        testCompressedReadWith(new long[]{0L}, false, false);
+        testCompressedReadWith(new long[]{1L}, false, false);
+        testCompressedReadWith(new long[]{100L}, false, false);
 
-        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false);
+        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false, false);
     }
 
     @Test(expected = EOFException.class)
     public void testTruncatedRead() throws Exception
     {
-        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
+        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true, false);
+    }
+
+    /**
+     * Test that CompressedInputStream does not block if there's an exception while reading stream
+     */
+    @Test(timeout = 30000)
+    public void testException() throws Exception
+    {
+        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false, true);
     }
 
     /**
      * @param valuesToCheck array of longs of range(0-999)
      * @throws Exception
      */
-    private void testCompressedReadWith(long[] valuesToCheck, boolean testTruncate) throws Exception
+    private void testCompressedReadWith(long[] valuesToCheck, boolean testTruncate, boolean testException) throws Exception
     {
         assert valuesToCheck != null && valuesToCheck.length > 0;
 
@@ -120,6 +130,12 @@ public class CompressedInputStreamTest
 
         // read buffer using CompressedInputStream
         CompressionInfo info = new CompressionInfo(chunks, param);
+
+        if (testException)
+        {
+            testException(sections, info);
+            return;
+        }
         CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info);
 
         try (DataInputStream in = new DataInputStream(input))
@@ -132,4 +148,25 @@ public class CompressedInputStreamTest
             }
         }
     }
+
+    private static void testException(List<Pair<Long, Long>> sections, CompressionInfo info) throws IOException
+    {
+        CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(new byte[0]), info);
+
+        try (DataInputStream in = new DataInputStream(input))
+        {
+            for (int i = 0; i < sections.size(); i++)
+            {
+                input.position(sections.get(i).left);
+                try {
+                    in.readLong();
+                    fail("Should have thrown IOException");
+                }
+                catch (IOException e)
+                {
+                    continue;
+                }
+            }
+        }
+    }
 }