You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/01/13 20:16:53 UTC

[01/10] cassandra git commit: Fix error streaming section more than 2GB

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 c7f003291 -> 582bdba4b
  refs/heads/cassandra-3.0 95012dab5 -> 0f995a2dc
  refs/heads/cassandra-3.3 cf3dcc2bb -> a7feb80d6
  refs/heads/trunk 88255d5b1 -> 8036f04c0


Fix error streaming section more than 2GB

patch by Paulo Motta; reviewed by yukim for CASSANDRA-10961


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/582bdba4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/582bdba4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/582bdba4

Branch: refs/heads/cassandra-2.2
Commit: 582bdba4b201e6ab8e2a9a05cff3566f1bab9dce
Parents: c7f0032
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Jan 13 12:51:17 2016 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jan 13 13:00:11 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/db/composites/AbstractCType.java  |  3 +-
 .../cassandra/streaming/ConnectionHandler.java  |  2 +-
 .../cassandra/streaming/StreamReader.java       | 26 ++++++++---
 .../cassandra/streaming/StreamWriter.java       | 10 ++++-
 .../compress/CompressedInputStream.java         | 45 +++++++------------
 .../compress/CompressedStreamReader.java        | 32 ++++++++++----
 .../compress/CompressedStreamWriter.java        | 16 ++++++-
 .../compress/CompressedInputStreamTest.java     | 46 --------------------
 9 files changed, 86 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1554cf5..11f2529 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.5
+ * Fix error streaming section more than 2GB (CASSANDRA-10961)
  * (cqlsh) Also apply --connect-timeout to control connection
    timeout (CASSANDRA-10959)
  * Histogram buckets exposed in jmx are sorted incorrectly (CASSANDRA-10975)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/db/composites/AbstractCType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/AbstractCType.java b/src/java/org/apache/cassandra/db/composites/AbstractCType.java
index a982280..2190c69 100644
--- a/src/java/org/apache/cassandra/db/composites/AbstractCType.java
+++ b/src/java/org/apache/cassandra/db/composites/AbstractCType.java
@@ -359,7 +359,8 @@ public abstract class AbstractCType implements CType
     protected static void checkRemaining(ByteBuffer bb, int offs, int length)
     {
         if (offs + length > bb.limit())
-            throw new IllegalArgumentException("Not enough bytes");
+            throw new IllegalArgumentException(String.format("Not enough bytes. Offset: %d. Length: %d. Buffer size: %d",
+                                                             offs, length, bb.limit()));
     }
 
     private static class Serializer implements CType.Serializer

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 681f61e..1ec7e1c 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -259,11 +259,11 @@ public class ConnectionHandler
                 {
                     // receive message
                     StreamMessage message = StreamMessage.deserialize(in, protocolVersion, session);
+                    logger.debug("[Stream #{}] Received {}", session.planId(), message);
                     // Might be null if there is an error during streaming (see FileMessage.deserialize). It's ok
                     // to ignore here since we'll have asked for a retry.
                     if (message != null)
                     {
-                        logger.debug("[Stream #{}] Received {}", session.planId(), message);
                         session.messageReceived(message);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index fe3b13d..8789720 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -60,6 +60,7 @@ public class StreamReader
     protected final long repairedAt;
     protected final SSTableFormat.Type format;
     protected final int sstableLevel;
+    protected final int fileSeqNum;
 
     protected Descriptor desc;
 
@@ -73,6 +74,7 @@ public class StreamReader
         this.repairedAt = header.repairedAt;
         this.format = header.format;
         this.sstableLevel = header.sstableLevel;
+        this.fileSeqNum = header.sequenceNumber;
     }
 
     /**
@@ -83,33 +85,46 @@ public class StreamReader
     @SuppressWarnings("resource")
     public SSTableWriter read(ReadableByteChannel channel) throws IOException
     {
-        logger.debug("reading file from {}, repairedAt = {}, level = {}", session.peer, repairedAt, sstableLevel);
         long totalSize = totalSize();
 
         Pair<String, String> kscf = Schema.instance.getCF(cfId);
-        if (kscf == null)
+        ColumnFamilyStore cfs = null;
+        if (kscf != null)
+            cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+        if (kscf == null || cfs == null)
         {
             // schema was dropped during streaming
             throw new IOException("CF " + cfId + " was dropped during streaming");
         }
-        ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+        logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.",
+                     session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
+                     cfs.getColumnFamilyName());
 
         DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
         BytesReadTracker in = new BytesReadTracker(dis);
         SSTableWriter writer = null;
+        DecoratedKey key = null;
         try
         {
             writer = createWriter(cfs, totalSize, repairedAt, format);
             while (in.getBytesRead() < totalSize)
             {
-                writeRow(writer, in, cfs);
+                key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
+                writeRow(key, writer, in, cfs);
 
                 // TODO move this to BytesReadTracker
                 session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
             }
+            logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}",
+                         session.planId(), fileSeqNum, session.peer, in.getBytesRead(), totalSize);
             return writer;
         } catch (Throwable e)
         {
+            if (key != null)
+                logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
+                            session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName());
             if (writer != null)
             {
                 try
@@ -167,9 +182,8 @@ public class StreamReader
         return size;
     }
 
-    protected void writeRow(SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws IOException
+    protected void writeRow(DecoratedKey key, SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws IOException
     {
-        DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
         writer.appendFromStream(key, cfs.metadata, in, inputVersion);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java
index 106677c..721ae1e 100644
--- a/src/java/org/apache/cassandra/streaming/StreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java
@@ -22,6 +22,9 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collection;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.ning.compress.lzf.LZFOutputStream;
 
 import org.apache.cassandra.io.sstable.Component;
@@ -41,6 +44,8 @@ public class StreamWriter
 {
     private static final int DEFAULT_CHUNK_SIZE = 64 * 1024;
 
+    private static final Logger logger = LoggerFactory.getLogger(StreamWriter.class);
+
     protected final SSTableReader sstable;
     protected final Collection<Pair<Long, Long>> sections;
     protected final StreamRateLimiter limiter;
@@ -70,7 +75,8 @@ public class StreamWriter
     public void write(DataOutputStreamPlus output) throws IOException
     {
         long totalSize = totalSize();
-
+        logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(),
+                     sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize);
 
         try(RandomAccessReader file = sstable.openDataReader();
             ChecksumValidator validator = new File(sstable.descriptor.filenameFor(Component.CRC)).exists()
@@ -109,6 +115,8 @@ public class StreamWriter
                 // make sure that current section is sent
                 compressedOutput.flush();
             }
+            logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}",
+                         session.planId(), sstable.getFilename(), session.peer, progress, totalSize);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index daa339a..489fed9 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -30,6 +30,9 @@ import java.util.zip.Checksum;
 import com.google.common.collect.Iterators;
 import com.google.common.primitives.Ints;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.utils.WrappedRunnable;
 
@@ -38,12 +41,15 @@ import org.apache.cassandra.utils.WrappedRunnable;
  */
 public class CompressedInputStream extends InputStream
 {
+
+    private static final Logger logger = LoggerFactory.getLogger(CompressedInputStream.class);
+
     private final CompressionInfo info;
     // chunk buffer
     private final BlockingQueue<byte[]> dataBuffer;
 
     // uncompressed bytes
-    private final byte[] buffer;
+    private byte[] buffer;
 
     // offset from the beginning of the buffer
     protected long bufferOffset = 0;
@@ -61,8 +67,6 @@ public class CompressedInputStream extends InputStream
 
     private long totalCompressedBytesRead;
 
-    private Thread readerThread;
-
     /**
      * @param source Input source to read compressed data from
      * @param info Compression info
@@ -73,10 +77,9 @@ public class CompressedInputStream extends InputStream
         this.checksum =  new Adler32();
         this.buffer = new byte[info.parameters.chunkLength()];
         // buffer is limited to store up to 1024 chunks
-        this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
+        this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
 
-        readerThread = new Thread(new Reader(source, info, dataBuffer));
-        readerThread.start();
+        new Thread(new Reader(source, info, dataBuffer)).start();
     }
 
     public int read() throws IOException
@@ -135,7 +138,7 @@ public class CompressedInputStream extends InputStream
         return totalCompressedBytesRead;
     }
 
-    class Reader extends WrappedRunnable
+    static class Reader extends WrappedRunnable
     {
         private final InputStream source;
         private final Iterator<CompressionMetadata.Chunk> chunks;
@@ -151,7 +154,7 @@ public class CompressedInputStream extends InputStream
         protected void runMayThrow() throws Exception
         {
             byte[] compressedWithCRC;
-            while (!Thread.currentThread().isInterrupted() && chunks.hasNext())
+            while (chunks.hasNext())
             {
                 CompressionMetadata.Chunk chunk = chunks.next();
 
@@ -161,43 +164,25 @@ public class CompressedInputStream extends InputStream
                 int bufferRead = 0;
                 while (bufferRead < readLength)
                 {
-                    int r;
                     try
                     {
-                        r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
+                        int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
                         if (r < 0)
                         {
                             dataBuffer.put(POISON_PILL);
                             return; // throw exception where we consume dataBuffer
                         }
+                        bufferRead += r;
                     }
                     catch (IOException e)
                     {
+                        logger.warn("Error while reading compressed input stream.", e);
                         dataBuffer.put(POISON_PILL);
-                        throw e;
+                        return; // throw exception where we consume dataBuffer
                     }
-                    bufferRead += r;
                 }
                 dataBuffer.put(compressedWithCRC);
             }
-            synchronized(CompressedInputStream.this)
-            {
-                readerThread = null;
-            }
-        }
-    }
-
-    @Override
-    public void close() throws IOException
-    {
-        synchronized(this)
-        {
-            if (readerThread != null)
-            {
-                readerThread.interrupt();
-                readerThread = null;
-            }
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 2277943..c684e4f 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -25,6 +25,7 @@ import java.nio.channels.ReadableByteChannel;
 
 import com.google.common.base.Throwables;
 
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 
 import org.slf4j.Logger;
@@ -34,10 +35,12 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.StreamReader;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.messages.FileMessageHeader;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.BytesReadTracker;
 import org.apache.cassandra.utils.Pair;
 
@@ -64,44 +67,59 @@ public class CompressedStreamReader extends StreamReader
     @SuppressWarnings("resource")
     public SSTableWriter read(ReadableByteChannel channel) throws IOException
     {
-        logger.debug("reading file from {}, repairedAt = {}", session.peer, repairedAt);
         long totalSize = totalSize();
 
         Pair<String, String> kscf = Schema.instance.getCF(cfId);
-        if (kscf == null)
+        ColumnFamilyStore cfs = null;
+        if (kscf != null)
+            cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+        if (kscf == null || cfs == null)
         {
             // schema was dropped during streaming
             throw new IOException("CF " + cfId + " was dropped during streaming");
         }
-        ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+        logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.",
+                     session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
+                     cfs.getColumnFamilyName());
 
         CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
         BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
         SSTableWriter writer = null;
+        DecoratedKey key = null;
         try
         {
             writer = createWriter(cfs, totalSize, repairedAt, format);
+            int sectionIdx = 0;
             for (Pair<Long, Long> section : sections)
             {
                 assert cis.getTotalCompressedBytesRead() <= totalSize;
-                int sectionLength = (int) (section.right - section.left);
+                long sectionLength = section.right - section.left;
 
+                logger.trace("[Stream #{}] Reading section {} with length {} from stream.", session.planId(), sectionIdx++, sectionLength);
                 // skip to beginning of section inside chunk
                 cis.position(section.left);
                 in.reset(0);
 
                 while (in.getBytesRead() < sectionLength)
                 {
-                    writeRow(writer, in, cfs);
+                    key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
+                    writeRow(key, writer, in, cfs);
 
                     // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred
                     session.progress(desc, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
                 }
             }
+            logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", session.planId(), fileSeqNum,
+                         session.peer, cis.getTotalCompressedBytesRead(), totalSize);
             return writer;
         }
         catch (Throwable e)
         {
+            if (key != null)
+                logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
+                            session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName());
             if (writer != null)
             {
                 try
@@ -120,10 +138,6 @@ public class CompressedStreamReader extends StreamReader
             else
                 throw Throwables.propagate(e);
         }
-        finally
-        {
-            cis.close();
-        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index 144980c..99e9bd6 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@ -25,11 +25,13 @@ import java.util.List;
 
 import com.google.common.base.Function;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
-import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.StreamSession;
@@ -43,6 +45,8 @@ public class CompressedStreamWriter extends StreamWriter
 {
     public static final int CHUNK_SIZE = 10 * 1024 * 1024;
 
+    private static final Logger logger = LoggerFactory.getLogger(CompressedStreamWriter.class);
+
     private final CompressionInfo compressionInfo;
 
     public CompressedStreamWriter(SSTableReader sstable, Collection<Pair<Long, Long>> sections, CompressionInfo compressionInfo, StreamSession session)
@@ -55,16 +59,24 @@ public class CompressedStreamWriter extends StreamWriter
     public void write(DataOutputStreamPlus out) throws IOException
     {
         long totalSize = totalSize();
+        logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(),
+                     sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize);
         try (RandomAccessReader file = sstable.openDataReader(); final ChannelProxy fc = file.getChannel())
         {
             long progress = 0L;
             // calculate chunks to transfer. we want to send continuous chunks altogether.
             List<Pair<Long, Long>> sections = getTransferSections(compressionInfo.chunks);
+
+            int sectionIdx = 0;
+
             // stream each of the required sections of the file
             for (final Pair<Long, Long> section : sections)
             {
                 // length of the section to stream
                 long length = section.right - section.left;
+
+                logger.trace("[Stream #{}] Writing section {} with length {} to stream.", session.planId(), sectionIdx++, length);
+
                 // tracks write progress
                 long bytesTransferred = 0;
                 while (bytesTransferred < length)
@@ -84,6 +96,8 @@ public class CompressedStreamWriter extends StreamWriter
                     session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize);
                 }
             }
+            logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}",
+                         session.planId(), sstable.getFilename(), session.peer, progress, totalSize);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index e692441..0becd18 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -58,52 +58,6 @@ public class CompressedInputStreamTest
     }
 
     /**
-     * Test CompressedInputStream not hang when closed while reading
-     * @throws IOException
-     */
-    @Test(expected = EOFException.class)
-    public void testClose() throws IOException
-    {
-        CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
-        CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
-        final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
-        InputStream blockingInput = new InputStream()
-        {
-            @Override
-            public int read() throws IOException
-            {
-                try
-                {
-                    // 10 second cut off not to stop other test in case
-                    return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
-                }
-                catch (InterruptedException e)
-                {
-                    throw new IOException("Interrupted as expected", e);
-                }
-            }
-        };
-        CompressionInfo info = new CompressionInfo(chunks, param);
-        try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info))
-        {
-            new Thread(new Runnable()
-            {
-                @Override
-                public void run()
-                {
-                    try
-                    {
-                        cis.close();
-                    }
-                    catch (Exception ignore) {}
-                }
-            }).start();
-            // block here
-            cis.read();
-        }
-    }
-
-    /**
      * @param valuesToCheck array of longs of range(0-999)
      * @throws Exception
      */


[03/10] cassandra git commit: Fix error streaming section more than 2GB

Posted by yu...@apache.org.
Fix error streaming section more than 2GB

patch by Paulo Motta; reviewed by yukim for CASSANDRA-10961


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/582bdba4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/582bdba4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/582bdba4

Branch: refs/heads/cassandra-3.3
Commit: 582bdba4b201e6ab8e2a9a05cff3566f1bab9dce
Parents: c7f0032
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Jan 13 12:51:17 2016 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jan 13 13:00:11 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/db/composites/AbstractCType.java  |  3 +-
 .../cassandra/streaming/ConnectionHandler.java  |  2 +-
 .../cassandra/streaming/StreamReader.java       | 26 ++++++++---
 .../cassandra/streaming/StreamWriter.java       | 10 ++++-
 .../compress/CompressedInputStream.java         | 45 +++++++------------
 .../compress/CompressedStreamReader.java        | 32 ++++++++++----
 .../compress/CompressedStreamWriter.java        | 16 ++++++-
 .../compress/CompressedInputStreamTest.java     | 46 --------------------
 9 files changed, 86 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1554cf5..11f2529 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.5
+ * Fix error streaming section more than 2GB (CASSANDRA-10961)
  * (cqlsh) Also apply --connect-timeout to control connection
    timeout (CASSANDRA-10959)
  * Histogram buckets exposed in jmx are sorted incorrectly (CASSANDRA-10975)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/db/composites/AbstractCType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/AbstractCType.java b/src/java/org/apache/cassandra/db/composites/AbstractCType.java
index a982280..2190c69 100644
--- a/src/java/org/apache/cassandra/db/composites/AbstractCType.java
+++ b/src/java/org/apache/cassandra/db/composites/AbstractCType.java
@@ -359,7 +359,8 @@ public abstract class AbstractCType implements CType
     protected static void checkRemaining(ByteBuffer bb, int offs, int length)
     {
         if (offs + length > bb.limit())
-            throw new IllegalArgumentException("Not enough bytes");
+            throw new IllegalArgumentException(String.format("Not enough bytes. Offset: %d. Length: %d. Buffer size: %d",
+                                                             offs, length, bb.limit()));
     }
 
     private static class Serializer implements CType.Serializer

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 681f61e..1ec7e1c 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -259,11 +259,11 @@ public class ConnectionHandler
                 {
                     // receive message
                     StreamMessage message = StreamMessage.deserialize(in, protocolVersion, session);
+                    logger.debug("[Stream #{}] Received {}", session.planId(), message);
                     // Might be null if there is an error during streaming (see FileMessage.deserialize). It's ok
                     // to ignore here since we'll have asked for a retry.
                     if (message != null)
                     {
-                        logger.debug("[Stream #{}] Received {}", session.planId(), message);
                         session.messageReceived(message);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index fe3b13d..8789720 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -60,6 +60,7 @@ public class StreamReader
     protected final long repairedAt;
     protected final SSTableFormat.Type format;
     protected final int sstableLevel;
+    protected final int fileSeqNum;
 
     protected Descriptor desc;
 
@@ -73,6 +74,7 @@ public class StreamReader
         this.repairedAt = header.repairedAt;
         this.format = header.format;
         this.sstableLevel = header.sstableLevel;
+        this.fileSeqNum = header.sequenceNumber;
     }
 
     /**
@@ -83,33 +85,46 @@ public class StreamReader
     @SuppressWarnings("resource")
     public SSTableWriter read(ReadableByteChannel channel) throws IOException
     {
-        logger.debug("reading file from {}, repairedAt = {}, level = {}", session.peer, repairedAt, sstableLevel);
         long totalSize = totalSize();
 
         Pair<String, String> kscf = Schema.instance.getCF(cfId);
-        if (kscf == null)
+        ColumnFamilyStore cfs = null;
+        if (kscf != null)
+            cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+        if (kscf == null || cfs == null)
         {
             // schema was dropped during streaming
             throw new IOException("CF " + cfId + " was dropped during streaming");
         }
-        ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+        logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.",
+                     session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
+                     cfs.getColumnFamilyName());
 
         DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
         BytesReadTracker in = new BytesReadTracker(dis);
         SSTableWriter writer = null;
+        DecoratedKey key = null;
         try
         {
             writer = createWriter(cfs, totalSize, repairedAt, format);
             while (in.getBytesRead() < totalSize)
             {
-                writeRow(writer, in, cfs);
+                key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
+                writeRow(key, writer, in, cfs);
 
                 // TODO move this to BytesReadTracker
                 session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
             }
+            logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}",
+                         session.planId(), fileSeqNum, session.peer, in.getBytesRead(), totalSize);
             return writer;
         } catch (Throwable e)
         {
+            if (key != null)
+                logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
+                            session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName());
             if (writer != null)
             {
                 try
@@ -167,9 +182,8 @@ public class StreamReader
         return size;
     }
 
-    protected void writeRow(SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws IOException
+    protected void writeRow(DecoratedKey key, SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws IOException
     {
-        DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
         writer.appendFromStream(key, cfs.metadata, in, inputVersion);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java
index 106677c..721ae1e 100644
--- a/src/java/org/apache/cassandra/streaming/StreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java
@@ -22,6 +22,9 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collection;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.ning.compress.lzf.LZFOutputStream;
 
 import org.apache.cassandra.io.sstable.Component;
@@ -41,6 +44,8 @@ public class StreamWriter
 {
     private static final int DEFAULT_CHUNK_SIZE = 64 * 1024;
 
+    private static final Logger logger = LoggerFactory.getLogger(StreamWriter.class);
+
     protected final SSTableReader sstable;
     protected final Collection<Pair<Long, Long>> sections;
     protected final StreamRateLimiter limiter;
@@ -70,7 +75,8 @@ public class StreamWriter
     public void write(DataOutputStreamPlus output) throws IOException
     {
         long totalSize = totalSize();
-
+        logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(),
+                     sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize);
 
         try(RandomAccessReader file = sstable.openDataReader();
             ChecksumValidator validator = new File(sstable.descriptor.filenameFor(Component.CRC)).exists()
@@ -109,6 +115,8 @@ public class StreamWriter
                 // make sure that current section is sent
                 compressedOutput.flush();
             }
+            logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}",
+                         session.planId(), sstable.getFilename(), session.peer, progress, totalSize);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index daa339a..489fed9 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -30,6 +30,9 @@ import java.util.zip.Checksum;
 import com.google.common.collect.Iterators;
 import com.google.common.primitives.Ints;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.utils.WrappedRunnable;
 
@@ -38,12 +41,15 @@ import org.apache.cassandra.utils.WrappedRunnable;
  */
 public class CompressedInputStream extends InputStream
 {
+
+    private static final Logger logger = LoggerFactory.getLogger(CompressedInputStream.class);
+
     private final CompressionInfo info;
     // chunk buffer
     private final BlockingQueue<byte[]> dataBuffer;
 
     // uncompressed bytes
-    private final byte[] buffer;
+    private byte[] buffer;
 
     // offset from the beginning of the buffer
     protected long bufferOffset = 0;
@@ -61,8 +67,6 @@ public class CompressedInputStream extends InputStream
 
     private long totalCompressedBytesRead;
 
-    private Thread readerThread;
-
     /**
      * @param source Input source to read compressed data from
      * @param info Compression info
@@ -73,10 +77,9 @@ public class CompressedInputStream extends InputStream
         this.checksum =  new Adler32();
         this.buffer = new byte[info.parameters.chunkLength()];
         // buffer is limited to store up to 1024 chunks
-        this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
+        this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
 
-        readerThread = new Thread(new Reader(source, info, dataBuffer));
-        readerThread.start();
+        new Thread(new Reader(source, info, dataBuffer)).start();
     }
 
     public int read() throws IOException
@@ -135,7 +138,7 @@ public class CompressedInputStream extends InputStream
         return totalCompressedBytesRead;
     }
 
-    class Reader extends WrappedRunnable
+    static class Reader extends WrappedRunnable
     {
         private final InputStream source;
         private final Iterator<CompressionMetadata.Chunk> chunks;
@@ -151,7 +154,7 @@ public class CompressedInputStream extends InputStream
         protected void runMayThrow() throws Exception
         {
             byte[] compressedWithCRC;
-            while (!Thread.currentThread().isInterrupted() && chunks.hasNext())
+            while (chunks.hasNext())
             {
                 CompressionMetadata.Chunk chunk = chunks.next();
 
@@ -161,43 +164,25 @@ public class CompressedInputStream extends InputStream
                 int bufferRead = 0;
                 while (bufferRead < readLength)
                 {
-                    int r;
                     try
                     {
-                        r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
+                        int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
                         if (r < 0)
                         {
                             dataBuffer.put(POISON_PILL);
                             return; // throw exception where we consume dataBuffer
                         }
+                        bufferRead += r;
                     }
                     catch (IOException e)
                     {
+                        logger.warn("Error while reading compressed input stream.", e);
                         dataBuffer.put(POISON_PILL);
-                        throw e;
+                        return; // throw exception where we consume dataBuffer
                     }
-                    bufferRead += r;
                 }
                 dataBuffer.put(compressedWithCRC);
             }
-            synchronized(CompressedInputStream.this)
-            {
-                readerThread = null;
-            }
-        }
-    }
-
-    @Override
-    public void close() throws IOException
-    {
-        synchronized(this)
-        {
-            if (readerThread != null)
-            {
-                readerThread.interrupt();
-                readerThread = null;
-            }
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 2277943..c684e4f 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -25,6 +25,7 @@ import java.nio.channels.ReadableByteChannel;
 
 import com.google.common.base.Throwables;
 
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 
 import org.slf4j.Logger;
@@ -34,10 +35,12 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.StreamReader;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.messages.FileMessageHeader;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.BytesReadTracker;
 import org.apache.cassandra.utils.Pair;
 
@@ -64,44 +67,59 @@ public class CompressedStreamReader extends StreamReader
     @SuppressWarnings("resource")
     public SSTableWriter read(ReadableByteChannel channel) throws IOException
     {
-        logger.debug("reading file from {}, repairedAt = {}", session.peer, repairedAt);
         long totalSize = totalSize();
 
         Pair<String, String> kscf = Schema.instance.getCF(cfId);
-        if (kscf == null)
+        ColumnFamilyStore cfs = null;
+        if (kscf != null)
+            cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+        if (kscf == null || cfs == null)
         {
             // schema was dropped during streaming
             throw new IOException("CF " + cfId + " was dropped during streaming");
         }
-        ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+        logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.",
+                     session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
+                     cfs.getColumnFamilyName());
 
         CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
         BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
         SSTableWriter writer = null;
+        DecoratedKey key = null;
         try
         {
             writer = createWriter(cfs, totalSize, repairedAt, format);
+            int sectionIdx = 0;
             for (Pair<Long, Long> section : sections)
             {
                 assert cis.getTotalCompressedBytesRead() <= totalSize;
-                int sectionLength = (int) (section.right - section.left);
+                long sectionLength = section.right - section.left;
 
+                logger.trace("[Stream #{}] Reading section {} with length {} from stream.", session.planId(), sectionIdx++, sectionLength);
                 // skip to beginning of section inside chunk
                 cis.position(section.left);
                 in.reset(0);
 
                 while (in.getBytesRead() < sectionLength)
                 {
-                    writeRow(writer, in, cfs);
+                    key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
+                    writeRow(key, writer, in, cfs);
 
                     // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred
                     session.progress(desc, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
                 }
             }
+            logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", session.planId(), fileSeqNum,
+                         session.peer, cis.getTotalCompressedBytesRead(), totalSize);
             return writer;
         }
         catch (Throwable e)
         {
+            if (key != null)
+                logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
+                            session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName());
             if (writer != null)
             {
                 try
@@ -120,10 +138,6 @@ public class CompressedStreamReader extends StreamReader
             else
                 throw Throwables.propagate(e);
         }
-        finally
-        {
-            cis.close();
-        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index 144980c..99e9bd6 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@ -25,11 +25,13 @@ import java.util.List;
 
 import com.google.common.base.Function;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
-import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.StreamSession;
@@ -43,6 +45,8 @@ public class CompressedStreamWriter extends StreamWriter
 {
     public static final int CHUNK_SIZE = 10 * 1024 * 1024;
 
+    private static final Logger logger = LoggerFactory.getLogger(CompressedStreamWriter.class);
+
     private final CompressionInfo compressionInfo;
 
     public CompressedStreamWriter(SSTableReader sstable, Collection<Pair<Long, Long>> sections, CompressionInfo compressionInfo, StreamSession session)
@@ -55,16 +59,24 @@ public class CompressedStreamWriter extends StreamWriter
     public void write(DataOutputStreamPlus out) throws IOException
     {
         long totalSize = totalSize();
+        logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(),
+                     sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize);
         try (RandomAccessReader file = sstable.openDataReader(); final ChannelProxy fc = file.getChannel())
         {
             long progress = 0L;
             // calculate chunks to transfer. we want to send continuous chunks altogether.
             List<Pair<Long, Long>> sections = getTransferSections(compressionInfo.chunks);
+
+            int sectionIdx = 0;
+
             // stream each of the required sections of the file
             for (final Pair<Long, Long> section : sections)
             {
                 // length of the section to stream
                 long length = section.right - section.left;
+
+                logger.trace("[Stream #{}] Writing section {} with length {} to stream.", session.planId(), sectionIdx++, length);
+
                 // tracks write progress
                 long bytesTransferred = 0;
                 while (bytesTransferred < length)
@@ -84,6 +96,8 @@ public class CompressedStreamWriter extends StreamWriter
                     session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize);
                 }
             }
+            logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}",
+                         session.planId(), sstable.getFilename(), session.peer, progress, totalSize);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index e692441..0becd18 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -58,52 +58,6 @@ public class CompressedInputStreamTest
     }
 
     /**
-     * Test CompressedInputStream not hang when closed while reading
-     * @throws IOException
-     */
-    @Test(expected = EOFException.class)
-    public void testClose() throws IOException
-    {
-        CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
-        CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
-        final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
-        InputStream blockingInput = new InputStream()
-        {
-            @Override
-            public int read() throws IOException
-            {
-                try
-                {
-                    // 10 second cut off not to stop other test in case
-                    return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
-                }
-                catch (InterruptedException e)
-                {
-                    throw new IOException("Interrupted as expected", e);
-                }
-            }
-        };
-        CompressionInfo info = new CompressionInfo(chunks, param);
-        try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info))
-        {
-            new Thread(new Runnable()
-            {
-                @Override
-                public void run()
-                {
-                    try
-                    {
-                        cis.close();
-                    }
-                    catch (Exception ignore) {}
-                }
-            }).start();
-            // block here
-            cis.read();
-        }
-    }
-
-    /**
      * @param valuesToCheck array of longs of range(0-999)
      * @throws Exception
      */


[09/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.3

Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.3


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7feb80d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7feb80d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7feb80d

Branch: refs/heads/trunk
Commit: a7feb80d6bd90a8f8041741e561282e6888a78f2
Parents: cf3dcc2 0f995a2
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jan 13 13:16:24 2016 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jan 13 13:16:24 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/ConnectionHandler.java  |  2 +-
 .../cassandra/streaming/StreamReader.java       | 19 ++++++--
 .../cassandra/streaming/StreamWriter.java       | 10 ++++-
 .../compress/CompressedInputStream.java         | 41 ++++++-----------
 .../compress/CompressedStreamReader.java        | 26 +++++++----
 .../compress/CompressedStreamWriter.java        | 15 +++++++
 .../compression/CompressedInputStreamTest.java  | 46 --------------------
 8 files changed, 73 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7feb80d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index fd05c01,614d5b4..aae5efe
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -7,28 -4,6 +7,29 @@@ Merged from 3.0
     tombstone (CASSANDRA-10743)
   * MV should use the maximum timestamp of the primary key (CASSANDRA-10910)
   * Fix potential assertion error during compaction (CASSANDRA-10944)
 +Merged from 2.2:
++ * Fix error streaming section more than 2GB (CASSANDRA-10961)
 + * Histogram buckets exposed in jmx are sorted incorrectly (CASSANDRA-10975)
 + * Enable GC logging by default (CASSANDRA-10140)
 + * Optimize pending range computation (CASSANDRA-9258)
 + * Skip commit log and saved cache directories in SSTable version startup check (CASSANDRA-10902)
 + * drop/alter user should be case sensitive (CASSANDRA-10817)
 +Merged from 2.1:
 + * (cqlsh) Add request timeout option to cqlsh (CASSANDRA-10686)
 + * Avoid AssertionError while submitting hint with LWT (CASSANDRA-10477)
 + * If CompactionMetadata is not in stats file, use index summary instead (CASSANDRA-10676)
 + * Retry sending gossip syn multiple times during shadow round (CASSANDRA-8072)
 + * Fix pending range calculation during moves (CASSANDRA-10887)
 + * Sane default (200Mbps) for inter-DC streaming througput (CASSANDRA-8708)
 +
 +
 +3.2
 + * Make sure tokens don't exist in several data directories (CASSANDRA-6696)
 + * Add requireAuthorization method to IAuthorizer (CASSANDRA-10852)
 + * Move static JVM options to conf/jvm.options file (CASSANDRA-10494)
 + * Fix CassandraVersion to accept x.y version string (CASSANDRA-10931)
 + * Add forceUserDefinedCleanup to allow more flexible cleanup (CASSANDRA-10708)
 + * (cqlsh) allow setting TTL with COPY (CASSANDRA-9494)
   * Fix counting of received sstables in streaming (CASSANDRA-10949)
   * Implement hints compression (CASSANDRA-9428)
   * Fix potential assertion error when reading static columns (CASSANDRA-10903)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7feb80d/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 61eb13f,268f974..838664d
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -62,7 -62,10 +62,8 @@@ public class StreamReade
      protected final SSTableFormat.Type format;
      protected final int sstableLevel;
      protected final SerializationHeader.Component header;
+     protected final int fileSeqNum;
  
 -    protected Descriptor desc;
 -
      public StreamReader(FileMessageHeader header, StreamSession session)
      {
          this.session = session;
@@@ -106,8 -116,10 +114,10 @@@
              {
                  writePartition(deserializer, writer);
                  // TODO move this to BytesReadTracker
 -                session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
 +                session.progress(writer.getFilename(), ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
              }
+             logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}",
+                          session.planId(), fileSeqNum, session.peer, in.getBytesRead(), totalSize);
              return writer;
          }
          catch (Throwable e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7feb80d/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7feb80d/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index c123102,5210d5b..5a47787
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@@ -24,8 -24,8 +24,9 @@@ import java.nio.channels.ReadableByteCh
  
  import com.google.common.base.Throwables;
  
+ import org.apache.cassandra.db.DecoratedKey;
  import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 +import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
  
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -96,9 -104,11 +105,11 @@@ public class CompressedStreamReader ext
                  {
                      writePartition(deserializer, writer);
                      // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred
 -                    session.progress(desc, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
 +                    session.progress(writer.getFilename(), ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
                  }
              }
+             logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", session.planId(), fileSeqNum,
+                          session.peer, cis.getTotalCompressedBytesRead(), totalSize);
              return writer;
          }
          catch (Throwable e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7feb80d/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index 93e0f76,f37af29..657da88
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@@ -25,8 -25,10 +25,11 @@@ import java.util.List
  
  import com.google.common.base.Function;
  
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
  import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.io.sstable.Component;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.io.util.ChannelProxy;
  import org.apache.cassandra.io.util.DataOutputStreamPlus;
@@@ -76,9 -88,11 +89,11 @@@ public class CompressedStreamWriter ext
                      long lastWrite = out.applyToChannel((wbc) -> fc.transferTo(section.left + bytesTransferredFinal, toTransfer, wbc));
                      bytesTransferred += lastWrite;
                      progress += lastWrite;
 -                    session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize);
 +                    session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize);
                  }
              }
+             logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}",
+                          session.planId(), sstable.getFilename(), session.peer, progress, totalSize);
          }
      }
  


[05/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0f995a2d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0f995a2d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0f995a2d

Branch: refs/heads/cassandra-3.3
Commit: 0f995a2dc7a116ec6def110e10af6bb9acc9f7b3
Parents: 95012da 582bdba
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jan 13 13:13:11 2016 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jan 13 13:13:11 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/ConnectionHandler.java  |  2 +-
 .../cassandra/streaming/StreamReader.java       | 20 +++++++--
 .../cassandra/streaming/StreamWriter.java       | 10 ++++-
 .../compress/CompressedInputStream.java         | 41 ++++++-----------
 .../compress/CompressedStreamReader.java        | 26 +++++++----
 .../compress/CompressedStreamWriter.java        | 15 +++++++
 .../compression/CompressedInputStreamTest.java  | 46 --------------------
 8 files changed, 73 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 38786c1,11f2529..614d5b4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,20 -1,5 +1,21 @@@
 -2.2.5
 +3.0.3
 + * Fix AssertionError when removing from list using UPDATE (CASSANDRA-10954)
 + * Fix UnsupportedOperationException when reading old sstable with range
 +   tombstone (CASSANDRA-10743)
 + * MV should use the maximum timestamp of the primary key (CASSANDRA-10910)
 + * Fix potential assertion error during compaction (CASSANDRA-10944)
 + * Fix counting of received sstables in streaming (CASSANDRA-10949)
 + * Implement hints compression (CASSANDRA-9428)
 + * Fix potential assertion error when reading static columns (CASSANDRA-10903)
 + * Avoid NoSuchElementException when executing empty batch (CASSANDRA-10711)
 + * Avoid building PartitionUpdate in toString (CASSANDRA-10897)
 + * Reduce heap spent when receiving many SSTables (CASSANDRA-10797)
 + * Add back support for 3rd party auth providers to bulk loader (CASSANDRA-10873)
 + * Eliminate the dependency on jgrapht for UDT resolution (CASSANDRA-10653)
 + * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-10837)
 + * Fix sstableloader not working with upper case keyspace name (CASSANDRA-10806)
 +Merged from 2.2:
+  * Fix error streaming section more than 2GB (CASSANDRA-10961)
   * (cqlsh) Also apply --connect-timeout to control connection
     timeout (CASSANDRA-10959)
   * Histogram buckets exposed in jmx are sorted incorrectly (CASSANDRA-10975)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 87dcda0,8789720..268f974
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -61,7 -60,7 +61,8 @@@ public class StreamReade
      protected final long repairedAt;
      protected final SSTableFormat.Type format;
      protected final int sstableLevel;
 +    protected final SerializationHeader.Component header;
+     protected final int fileSeqNum;
  
      protected Descriptor desc;
  
@@@ -75,7 -74,7 +76,8 @@@
          this.repairedAt = header.repairedAt;
          this.format = header.format;
          this.sstableLevel = header.sstableLevel;
 +        this.header = header.header;
+         this.fileSeqNum = header.sequenceNumber;
      }
  
      /**
@@@ -83,10 -82,9 +85,9 @@@
       * @return SSTable transferred
       * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
       */
 -    @SuppressWarnings("resource")
 -    public SSTableWriter read(ReadableByteChannel channel) throws IOException
 +    @SuppressWarnings("resource") // channel needs to remain open, streams on top of it can't be closed
 +    public SSTableMultiWriter read(ReadableByteChannel channel) throws IOException
      {
-         logger.debug("reading file from {}, repairedAt = {}, level = {}", session.peer, repairedAt, sstableLevel);
          long totalSize = totalSize();
  
          Pair<String, String> kscf = Schema.instance.getCF(cfId);
@@@ -110,13 -117,25 +118,18 @@@
                  // TODO move this to BytesReadTracker
                  session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
              }
+             logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}",
+                          session.planId(), fileSeqNum, session.peer, in.getBytesRead(), totalSize);
              return writer;
 -        } catch (Throwable e)
 +        }
 +        catch (Throwable e)
          {
 -            if (key != null)
++            if (deserializer != null)
+                 logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
 -                            session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName());
++                            session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getColumnFamilyName());
              if (writer != null)
              {
 -                try
 -                {
 -                    writer.abort();
 -                }
 -                catch (Throwable e2)
 -                {
 -                    // add abort error to original and continue so we can drain unread stream
 -                    e.addSuppressed(e2);
 -                }
 +                writer.abort(e);
              }
              drain(dis, in.getBytesRead());
              if (e instanceof IOException)
@@@ -126,16 -145,14 +139,15 @@@
          }
      }
  
 -    protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, SSTableFormat.Type format) throws IOException
 +    protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, SSTableFormat.Type format) throws IOException
      {
 -        Directories.DataDirectory localDir = cfs.directories.getWriteableLocation(totalSize);
 +        Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
          if (localDir == null)
              throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
- 
 -        desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir), format));
 +        desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format));
  
 -        return SSTableWriter.create(desc, estimatedKeys, repairedAt, sstableLevel);
 +
 +        return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), session.getTransaction(cfId));
      }
  
      protected void drain(InputStream dis, long bytesRead) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 56dc63a,489fed9..55ac7ac
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@@ -30,8 -30,10 +30,11 @@@ import java.util.zip.Checksum
  import com.google.common.collect.Iterators;
  import com.google.common.primitives.Ints;
  
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
  import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.utils.ChecksumType;
  import org.apache.cassandra.utils.WrappedRunnable;
  
  /**
@@@ -69,17 -71,15 +73,16 @@@ public class CompressedInputStream exte
       * @param source Input source to read compressed data from
       * @param info Compression info
       */
 -    public CompressedInputStream(InputStream source, CompressionInfo info)
 +    public CompressedInputStream(InputStream source, CompressionInfo info, ChecksumType checksumType, Supplier<Double> crcCheckChanceSupplier)
      {
          this.info = info;
 -        this.checksum =  new Adler32();
 +        this.checksum =  checksumType.newInstance();
          this.buffer = new byte[info.parameters.chunkLength()];
          // buffer is limited to store up to 1024 chunks
 -        this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
 +        this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
 +        this.crcCheckChanceSupplier = crcCheckChanceSupplier;
  
-         readerThread = new Thread(new Reader(source, info, dataBuffer));
-         readerThread.start();
+         new Thread(new Reader(source, info, dataBuffer)).start();
      }
  
      public int read() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 4d10244,c684e4f..5210d5b
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@@ -24,7 -25,8 +24,8 @@@ import java.nio.channels.ReadableByteCh
  
  import com.google.common.base.Throwables;
  
+ import org.apache.cassandra.db.DecoratedKey;
 -import org.apache.cassandra.io.sstable.format.SSTableWriter;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
  
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -60,10 -64,9 +61,9 @@@ public class CompressedStreamReader ext
       * @throws java.io.IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
       */
      @Override
 -    @SuppressWarnings("resource")
 -    public SSTableWriter read(ReadableByteChannel channel) throws IOException
 +    @SuppressWarnings("resource") // channel needs to remain open, streams on top of it can't be closed
 +    public SSTableMultiWriter read(ReadableByteChannel channel) throws IOException
      {
-         logger.debug("reading file from {}, repairedAt = {}", session.peer, repairedAt);
          long totalSize = totalSize();
  
          Pair<String, String> kscf = Schema.instance.getCF(cfId);
@@@ -72,13 -79,15 +76,16 @@@
              // schema was dropped during streaming
              throw new IOException("CF " + cfId + " was dropped during streaming");
          }
-         ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+ 
+         logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.",
+                      session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
+                      cfs.getColumnFamilyName());
  
 -        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
 +        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo,
 +                                                              inputVersion.compressedChecksumType(), cfs::getCrcCheckChance);
          BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
 -        SSTableWriter writer = null;
 -        DecoratedKey key = null;
 +        StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, header.toHeader(cfs.metadata));
 +        SSTableMultiWriter writer = null;
          try
          {
              writer = createWriter(cfs, totalSize, repairedAt, format);
@@@ -102,9 -117,20 +113,12 @@@
          }
          catch (Throwable e)
          {
 -            if (key != null)
++            if (deserializer != null)
+                 logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
 -                            session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName());
++                            session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getColumnFamilyName());
              if (writer != null)
              {
 -                try
 -                {
 -                    writer.abort();
 -                }
 -                catch (Throwable e2)
 -                {
 -                    // add abort error to original and continue so we can drain unread stream
 -                    e.addSuppressed(e2);
 -                }
 +                writer.abort(e);
              }
              drain(cis, in.getBytesRead());
              if (e instanceof IOException)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index adbd091,99e9bd6..f37af29
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@@ -55,7 -59,9 +60,9 @@@ public class CompressedStreamWriter ext
      public void write(DataOutputStreamPlus out) throws IOException
      {
          long totalSize = totalSize();
+         logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(),
+                      sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize);
 -        try (RandomAccessReader file = sstable.openDataReader(); final ChannelProxy fc = file.getChannel())
 +        try (ChannelProxy fc = sstable.getDataChannel().sharedCopy())
          {
              long progress = 0L;
              // calculate chunks to transfer. we want to send continuous chunks altogether.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
index 2162e32,0000000..a3300ac
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
@@@ -1,183 -1,0 +1,137 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.streaming.compression;
 +
 +import java.io.*;
 +import java.util.*;
 +import java.util.concurrent.SynchronousQueue;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.junit.Test;
 +import org.apache.cassandra.db.ClusteringComparator;
 +import org.apache.cassandra.db.marshal.BytesType;
 +import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 +import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.schema.CompressionParams;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.streaming.compress.CompressedInputStream;
 +import org.apache.cassandra.streaming.compress.CompressionInfo;
 +import org.apache.cassandra.utils.ChecksumType;
 +import org.apache.cassandra.utils.Pair;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +/**
 + */
 +public class CompressedInputStreamTest
 +{
 +    @Test
 +    public void testCompressedRead() throws Exception
 +    {
 +        testCompressedReadWith(new long[]{0L}, false);
 +        testCompressedReadWith(new long[]{1L}, false);
 +        testCompressedReadWith(new long[]{100L}, false);
 +
 +        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false);
 +    }
 +
 +    @Test(expected = EOFException.class)
 +    public void testTruncatedRead() throws Exception
 +    {
 +        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
 +    }
 +
 +    /**
-      * Test CompressedInputStream not hang when closed while reading
-      * @throws IOException
-      */
-     @Test(expected = EOFException.class)
-     public void testClose() throws IOException
-     {
-         CompressionParams param = CompressionParams.snappy(32);
-         CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
-         final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
-         InputStream blockingInput = new InputStream()
-         {
-             @Override
-             public int read() throws IOException
-             {
-                 try
-                 {
-                     // 10 second cut off not to stop other test in case
-                     return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
-                 }
-                 catch (InterruptedException e)
-                 {
-                     throw new IOException("Interrupted as expected", e);
-                 }
-             }
-         };
-         CompressionInfo info = new CompressionInfo(chunks, param);
-         try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, ChecksumType.CRC32, () -> 1.0))
-         {
-             new Thread(new Runnable()
-             {
-                 @Override
-                 public void run()
-                 {
-                     try
-                     {
-                         cis.close();
-                     }
-                     catch (Exception ignore) {}
-                 }
-             }).start();
-             // block here
-             cis.read();
-         }
-     }
- 
-     /**
 +     * @param valuesToCheck array of longs of range(0-999)
 +     * @throws Exception
 +     */
 +    private void testCompressedReadWith(long[] valuesToCheck, boolean testTruncate) throws Exception
 +    {
 +        assert valuesToCheck != null && valuesToCheck.length > 0;
 +
 +        // write compressed data file of longs
 +        File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
 +        Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
 +        MetadataCollector collector = new MetadataCollector(new ClusteringComparator(BytesType.instance));
 +        CompressionParams param = CompressionParams.snappy(32);
 +        Map<Long, Long> index = new HashMap<Long, Long>();
 +        try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
 +        {
 +            for (long l = 0L; l < 1000; l++)
 +            {
 +                index.put(l, writer.position());
 +                writer.writeLong(l);
 +            }
 +            writer.finish();
 +        }
 +
 +        CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
 +        List<Pair<Long, Long>> sections = new ArrayList<>();
 +        for (long l : valuesToCheck)
 +        {
 +            long position = index.get(l);
 +            sections.add(Pair.create(position, position + 8));
 +        }
 +        CompressionMetadata.Chunk[] chunks = comp.getChunksForSections(sections);
 +        long totalSize = comp.getTotalSizeForSections(sections);
 +        long expectedSize = 0;
 +        for (CompressionMetadata.Chunk c : chunks)
 +            expectedSize += c.length + 4;
 +        assertEquals(expectedSize, totalSize);
 +
 +        // buffer up only relevant parts of file
 +        int size = 0;
 +        for (CompressionMetadata.Chunk c : chunks)
 +            size += (c.length + 4); // 4bytes CRC
 +        byte[] toRead = new byte[size];
 +
 +        try (RandomAccessFile f = new RandomAccessFile(tmp, "r"))
 +        {
 +            int pos = 0;
 +            for (CompressionMetadata.Chunk c : chunks)
 +            {
 +                f.seek(c.offset);
 +                pos += f.read(toRead, pos, c.length + 4);
 +            }
 +        }
 +
 +        if (testTruncate)
 +        {
 +            byte [] actuallyRead = new byte[50];
 +            System.arraycopy(toRead, 0, actuallyRead, 0, 50);
 +            toRead = actuallyRead;
 +        }
 +
 +        // read buffer using CompressedInputStream
 +        CompressionInfo info = new CompressionInfo(chunks, param);
 +        CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info,
 +                                                                ChecksumType.CRC32, () -> 1.0);
 +
 +        try (DataInputStream in = new DataInputStream(input))
 +        {
 +            for (int i = 0; i < sections.size(); i++)
 +            {
 +                input.position(sections.get(i).left);
 +                long readValue = in.readLong();
 +                assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
 +            }
 +        }
 +    }
 +}


[08/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.3

Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.3


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7feb80d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7feb80d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7feb80d

Branch: refs/heads/cassandra-3.3
Commit: a7feb80d6bd90a8f8041741e561282e6888a78f2
Parents: cf3dcc2 0f995a2
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jan 13 13:16:24 2016 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jan 13 13:16:24 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/ConnectionHandler.java  |  2 +-
 .../cassandra/streaming/StreamReader.java       | 19 ++++++--
 .../cassandra/streaming/StreamWriter.java       | 10 ++++-
 .../compress/CompressedInputStream.java         | 41 ++++++-----------
 .../compress/CompressedStreamReader.java        | 26 +++++++----
 .../compress/CompressedStreamWriter.java        | 15 +++++++
 .../compression/CompressedInputStreamTest.java  | 46 --------------------
 8 files changed, 73 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7feb80d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index fd05c01,614d5b4..aae5efe
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -7,28 -4,6 +7,29 @@@ Merged from 3.0
     tombstone (CASSANDRA-10743)
   * MV should use the maximum timestamp of the primary key (CASSANDRA-10910)
   * Fix potential assertion error during compaction (CASSANDRA-10944)
 +Merged from 2.2:
++ * Fix error streaming section more than 2GB (CASSANDRA-10961)
 + * Histogram buckets exposed in jmx are sorted incorrectly (CASSANDRA-10975)
 + * Enable GC logging by default (CASSANDRA-10140)
 + * Optimize pending range computation (CASSANDRA-9258)
 + * Skip commit log and saved cache directories in SSTable version startup check (CASSANDRA-10902)
 + * drop/alter user should be case sensitive (CASSANDRA-10817)
 +Merged from 2.1:
 + * (cqlsh) Add request timeout option to cqlsh (CASSANDRA-10686)
 + * Avoid AssertionError while submitting hint with LWT (CASSANDRA-10477)
 + * If CompactionMetadata is not in stats file, use index summary instead (CASSANDRA-10676)
 + * Retry sending gossip syn multiple times during shadow round (CASSANDRA-8072)
 + * Fix pending range calculation during moves (CASSANDRA-10887)
 + * Sane default (200Mbps) for inter-DC streaming througput (CASSANDRA-8708)
 +
 +
 +3.2
 + * Make sure tokens don't exist in several data directories (CASSANDRA-6696)
 + * Add requireAuthorization method to IAuthorizer (CASSANDRA-10852)
 + * Move static JVM options to conf/jvm.options file (CASSANDRA-10494)
 + * Fix CassandraVersion to accept x.y version string (CASSANDRA-10931)
 + * Add forceUserDefinedCleanup to allow more flexible cleanup (CASSANDRA-10708)
 + * (cqlsh) allow setting TTL with COPY (CASSANDRA-9494)
   * Fix counting of received sstables in streaming (CASSANDRA-10949)
   * Implement hints compression (CASSANDRA-9428)
   * Fix potential assertion error when reading static columns (CASSANDRA-10903)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7feb80d/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 61eb13f,268f974..838664d
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -62,7 -62,10 +62,8 @@@ public class StreamReade
      protected final SSTableFormat.Type format;
      protected final int sstableLevel;
      protected final SerializationHeader.Component header;
+     protected final int fileSeqNum;
  
 -    protected Descriptor desc;
 -
      public StreamReader(FileMessageHeader header, StreamSession session)
      {
          this.session = session;
@@@ -106,8 -116,10 +114,10 @@@
              {
                  writePartition(deserializer, writer);
                  // TODO move this to BytesReadTracker
 -                session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
 +                session.progress(writer.getFilename(), ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
              }
+             logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}",
+                          session.planId(), fileSeqNum, session.peer, in.getBytesRead(), totalSize);
              return writer;
          }
          catch (Throwable e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7feb80d/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7feb80d/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index c123102,5210d5b..5a47787
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@@ -24,8 -24,8 +24,9 @@@ import java.nio.channels.ReadableByteCh
  
  import com.google.common.base.Throwables;
  
+ import org.apache.cassandra.db.DecoratedKey;
  import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 +import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
  
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -96,9 -104,11 +105,11 @@@ public class CompressedStreamReader ext
                  {
                      writePartition(deserializer, writer);
                      // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred
 -                    session.progress(desc, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
 +                    session.progress(writer.getFilename(), ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
                  }
              }
+             logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", session.planId(), fileSeqNum,
+                          session.peer, cis.getTotalCompressedBytesRead(), totalSize);
              return writer;
          }
          catch (Throwable e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7feb80d/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index 93e0f76,f37af29..657da88
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@@ -25,8 -25,10 +25,11 @@@ import java.util.List
  
  import com.google.common.base.Function;
  
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
  import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.io.sstable.Component;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.io.util.ChannelProxy;
  import org.apache.cassandra.io.util.DataOutputStreamPlus;
@@@ -76,9 -88,11 +89,11 @@@ public class CompressedStreamWriter ext
                      long lastWrite = out.applyToChannel((wbc) -> fc.transferTo(section.left + bytesTransferredFinal, toTransfer, wbc));
                      bytesTransferred += lastWrite;
                      progress += lastWrite;
 -                    session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize);
 +                    session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize);
                  }
              }
+             logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}",
+                          session.planId(), sstable.getFilename(), session.peer, progress, totalSize);
          }
      }
  


[10/10] cassandra git commit: Merge branch 'cassandra-3.3' into trunk

Posted by yu...@apache.org.
Merge branch 'cassandra-3.3' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8036f04c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8036f04c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8036f04c

Branch: refs/heads/trunk
Commit: 8036f04c08385fadc4c527fca5b3478171ae31dc
Parents: 88255d5 a7feb80
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jan 13 13:16:32 2016 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jan 13 13:16:32 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/ConnectionHandler.java  |  2 +-
 .../cassandra/streaming/StreamReader.java       | 19 ++++++--
 .../cassandra/streaming/StreamWriter.java       | 10 ++++-
 .../compress/CompressedInputStream.java         | 41 ++++++-----------
 .../compress/CompressedStreamReader.java        | 26 +++++++----
 .../compress/CompressedStreamWriter.java        | 15 +++++++
 .../compression/CompressedInputStreamTest.java  | 46 --------------------
 8 files changed, 73 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8036f04c/CHANGES.txt
----------------------------------------------------------------------


[07/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0f995a2d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0f995a2d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0f995a2d

Branch: refs/heads/trunk
Commit: 0f995a2dc7a116ec6def110e10af6bb9acc9f7b3
Parents: 95012da 582bdba
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jan 13 13:13:11 2016 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jan 13 13:13:11 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/ConnectionHandler.java  |  2 +-
 .../cassandra/streaming/StreamReader.java       | 20 +++++++--
 .../cassandra/streaming/StreamWriter.java       | 10 ++++-
 .../compress/CompressedInputStream.java         | 41 ++++++-----------
 .../compress/CompressedStreamReader.java        | 26 +++++++----
 .../compress/CompressedStreamWriter.java        | 15 +++++++
 .../compression/CompressedInputStreamTest.java  | 46 --------------------
 8 files changed, 73 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 38786c1,11f2529..614d5b4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,20 -1,5 +1,21 @@@
 -2.2.5
 +3.0.3
 + * Fix AssertionError when removing from list using UPDATE (CASSANDRA-10954)
 + * Fix UnsupportedOperationException when reading old sstable with range
 +   tombstone (CASSANDRA-10743)
 + * MV should use the maximum timestamp of the primary key (CASSANDRA-10910)
 + * Fix potential assertion error during compaction (CASSANDRA-10944)
 + * Fix counting of received sstables in streaming (CASSANDRA-10949)
 + * Implement hints compression (CASSANDRA-9428)
 + * Fix potential assertion error when reading static columns (CASSANDRA-10903)
 + * Avoid NoSuchElementException when executing empty batch (CASSANDRA-10711)
 + * Avoid building PartitionUpdate in toString (CASSANDRA-10897)
 + * Reduce heap spent when receiving many SSTables (CASSANDRA-10797)
 + * Add back support for 3rd party auth providers to bulk loader (CASSANDRA-10873)
 + * Eliminate the dependency on jgrapht for UDT resolution (CASSANDRA-10653)
 + * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-10837)
 + * Fix sstableloader not working with upper case keyspace name (CASSANDRA-10806)
 +Merged from 2.2:
+  * Fix error streaming section more than 2GB (CASSANDRA-10961)
   * (cqlsh) Also apply --connect-timeout to control connection
     timeout (CASSANDRA-10959)
   * Histogram buckets exposed in jmx are sorted incorrectly (CASSANDRA-10975)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 87dcda0,8789720..268f974
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -61,7 -60,7 +61,8 @@@ public class StreamReade
      protected final long repairedAt;
      protected final SSTableFormat.Type format;
      protected final int sstableLevel;
 +    protected final SerializationHeader.Component header;
+     protected final int fileSeqNum;
  
      protected Descriptor desc;
  
@@@ -75,7 -74,7 +76,8 @@@
          this.repairedAt = header.repairedAt;
          this.format = header.format;
          this.sstableLevel = header.sstableLevel;
 +        this.header = header.header;
+         this.fileSeqNum = header.sequenceNumber;
      }
  
      /**
@@@ -83,10 -82,9 +85,9 @@@
       * @return SSTable transferred
       * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
       */
 -    @SuppressWarnings("resource")
 -    public SSTableWriter read(ReadableByteChannel channel) throws IOException
 +    @SuppressWarnings("resource") // channel needs to remain open, streams on top of it can't be closed
 +    public SSTableMultiWriter read(ReadableByteChannel channel) throws IOException
      {
-         logger.debug("reading file from {}, repairedAt = {}, level = {}", session.peer, repairedAt, sstableLevel);
          long totalSize = totalSize();
  
          Pair<String, String> kscf = Schema.instance.getCF(cfId);
@@@ -110,13 -117,25 +118,18 @@@
                  // TODO move this to BytesReadTracker
                  session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
              }
+             logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}",
+                          session.planId(), fileSeqNum, session.peer, in.getBytesRead(), totalSize);
              return writer;
 -        } catch (Throwable e)
 +        }
 +        catch (Throwable e)
          {
 -            if (key != null)
++            if (deserializer != null)
+                 logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
 -                            session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName());
++                            session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getColumnFamilyName());
              if (writer != null)
              {
 -                try
 -                {
 -                    writer.abort();
 -                }
 -                catch (Throwable e2)
 -                {
 -                    // add abort error to original and continue so we can drain unread stream
 -                    e.addSuppressed(e2);
 -                }
 +                writer.abort(e);
              }
              drain(dis, in.getBytesRead());
              if (e instanceof IOException)
@@@ -126,16 -145,14 +139,15 @@@
          }
      }
  
 -    protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, SSTableFormat.Type format) throws IOException
 +    protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, SSTableFormat.Type format) throws IOException
      {
 -        Directories.DataDirectory localDir = cfs.directories.getWriteableLocation(totalSize);
 +        Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
          if (localDir == null)
              throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
- 
 -        desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir), format));
 +        desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format));
  
 -        return SSTableWriter.create(desc, estimatedKeys, repairedAt, sstableLevel);
 +
 +        return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), session.getTransaction(cfId));
      }
  
      protected void drain(InputStream dis, long bytesRead) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 56dc63a,489fed9..55ac7ac
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@@ -30,8 -30,10 +30,11 @@@ import java.util.zip.Checksum
  import com.google.common.collect.Iterators;
  import com.google.common.primitives.Ints;
  
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
  import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.utils.ChecksumType;
  import org.apache.cassandra.utils.WrappedRunnable;
  
  /**
@@@ -69,17 -71,15 +73,16 @@@ public class CompressedInputStream exte
       * @param source Input source to read compressed data from
       * @param info Compression info
       */
 -    public CompressedInputStream(InputStream source, CompressionInfo info)
 +    public CompressedInputStream(InputStream source, CompressionInfo info, ChecksumType checksumType, Supplier<Double> crcCheckChanceSupplier)
      {
          this.info = info;
 -        this.checksum =  new Adler32();
 +        this.checksum =  checksumType.newInstance();
          this.buffer = new byte[info.parameters.chunkLength()];
          // buffer is limited to store up to 1024 chunks
 -        this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
 +        this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
 +        this.crcCheckChanceSupplier = crcCheckChanceSupplier;
  
-         readerThread = new Thread(new Reader(source, info, dataBuffer));
-         readerThread.start();
+         new Thread(new Reader(source, info, dataBuffer)).start();
      }
  
      public int read() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 4d10244,c684e4f..5210d5b
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@@ -24,7 -25,8 +24,8 @@@ import java.nio.channels.ReadableByteCh
  
  import com.google.common.base.Throwables;
  
+ import org.apache.cassandra.db.DecoratedKey;
 -import org.apache.cassandra.io.sstable.format.SSTableWriter;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
  
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -60,10 -64,9 +61,9 @@@ public class CompressedStreamReader ext
       * @throws java.io.IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
       */
      @Override
 -    @SuppressWarnings("resource")
 -    public SSTableWriter read(ReadableByteChannel channel) throws IOException
 +    @SuppressWarnings("resource") // channel needs to remain open, streams on top of it can't be closed
 +    public SSTableMultiWriter read(ReadableByteChannel channel) throws IOException
      {
-         logger.debug("reading file from {}, repairedAt = {}", session.peer, repairedAt);
          long totalSize = totalSize();
  
          Pair<String, String> kscf = Schema.instance.getCF(cfId);
@@@ -72,13 -79,15 +76,16 @@@
              // schema was dropped during streaming
              throw new IOException("CF " + cfId + " was dropped during streaming");
          }
-         ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+ 
+         logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.",
+                      session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
+                      cfs.getColumnFamilyName());
  
 -        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
 +        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo,
 +                                                              inputVersion.compressedChecksumType(), cfs::getCrcCheckChance);
          BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
 -        SSTableWriter writer = null;
 -        DecoratedKey key = null;
 +        StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, header.toHeader(cfs.metadata));
 +        SSTableMultiWriter writer = null;
          try
          {
              writer = createWriter(cfs, totalSize, repairedAt, format);
@@@ -102,9 -117,20 +113,12 @@@
          }
          catch (Throwable e)
          {
 -            if (key != null)
++            if (deserializer != null)
+                 logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
 -                            session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName());
++                            session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getColumnFamilyName());
              if (writer != null)
              {
 -                try
 -                {
 -                    writer.abort();
 -                }
 -                catch (Throwable e2)
 -                {
 -                    // add abort error to original and continue so we can drain unread stream
 -                    e.addSuppressed(e2);
 -                }
 +                writer.abort(e);
              }
              drain(cis, in.getBytesRead());
              if (e instanceof IOException)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index adbd091,99e9bd6..f37af29
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@@ -55,7 -59,9 +60,9 @@@ public class CompressedStreamWriter ext
      public void write(DataOutputStreamPlus out) throws IOException
      {
          long totalSize = totalSize();
+         logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(),
+                      sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize);
 -        try (RandomAccessReader file = sstable.openDataReader(); final ChannelProxy fc = file.getChannel())
 +        try (ChannelProxy fc = sstable.getDataChannel().sharedCopy())
          {
              long progress = 0L;
              // calculate chunks to transfer. we want to send continuous chunks altogether.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
index 2162e32,0000000..a3300ac
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
@@@ -1,183 -1,0 +1,137 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.streaming.compression;
 +
 +import java.io.*;
 +import java.util.*;
 +import java.util.concurrent.SynchronousQueue;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.junit.Test;
 +import org.apache.cassandra.db.ClusteringComparator;
 +import org.apache.cassandra.db.marshal.BytesType;
 +import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 +import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.schema.CompressionParams;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.streaming.compress.CompressedInputStream;
 +import org.apache.cassandra.streaming.compress.CompressionInfo;
 +import org.apache.cassandra.utils.ChecksumType;
 +import org.apache.cassandra.utils.Pair;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +/**
 + */
 +public class CompressedInputStreamTest
 +{
 +    @Test
 +    public void testCompressedRead() throws Exception
 +    {
 +        testCompressedReadWith(new long[]{0L}, false);
 +        testCompressedReadWith(new long[]{1L}, false);
 +        testCompressedReadWith(new long[]{100L}, false);
 +
 +        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false);
 +    }
 +
 +    @Test(expected = EOFException.class)
 +    public void testTruncatedRead() throws Exception
 +    {
 +        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
 +    }
 +
 +    /**
-      * Test CompressedInputStream not hang when closed while reading
-      * @throws IOException
-      */
-     @Test(expected = EOFException.class)
-     public void testClose() throws IOException
-     {
-         CompressionParams param = CompressionParams.snappy(32);
-         CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
-         final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
-         InputStream blockingInput = new InputStream()
-         {
-             @Override
-             public int read() throws IOException
-             {
-                 try
-                 {
-                     // 10 second cut off not to stop other test in case
-                     return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
-                 }
-                 catch (InterruptedException e)
-                 {
-                     throw new IOException("Interrupted as expected", e);
-                 }
-             }
-         };
-         CompressionInfo info = new CompressionInfo(chunks, param);
-         try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, ChecksumType.CRC32, () -> 1.0))
-         {
-             new Thread(new Runnable()
-             {
-                 @Override
-                 public void run()
-                 {
-                     try
-                     {
-                         cis.close();
-                     }
-                     catch (Exception ignore) {}
-                 }
-             }).start();
-             // block here
-             cis.read();
-         }
-     }
- 
-     /**
 +     * @param valuesToCheck array of longs of range(0-999)
 +     * @throws Exception
 +     */
 +    private void testCompressedReadWith(long[] valuesToCheck, boolean testTruncate) throws Exception
 +    {
 +        assert valuesToCheck != null && valuesToCheck.length > 0;
 +
 +        // write compressed data file of longs
 +        File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
 +        Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
 +        MetadataCollector collector = new MetadataCollector(new ClusteringComparator(BytesType.instance));
 +        CompressionParams param = CompressionParams.snappy(32);
 +        Map<Long, Long> index = new HashMap<Long, Long>();
 +        try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
 +        {
 +            for (long l = 0L; l < 1000; l++)
 +            {
 +                index.put(l, writer.position());
 +                writer.writeLong(l);
 +            }
 +            writer.finish();
 +        }
 +
 +        CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
 +        List<Pair<Long, Long>> sections = new ArrayList<>();
 +        for (long l : valuesToCheck)
 +        {
 +            long position = index.get(l);
 +            sections.add(Pair.create(position, position + 8));
 +        }
 +        CompressionMetadata.Chunk[] chunks = comp.getChunksForSections(sections);
 +        long totalSize = comp.getTotalSizeForSections(sections);
 +        long expectedSize = 0;
 +        for (CompressionMetadata.Chunk c : chunks)
 +            expectedSize += c.length + 4;
 +        assertEquals(expectedSize, totalSize);
 +
 +        // buffer up only relevant parts of file
 +        int size = 0;
 +        for (CompressionMetadata.Chunk c : chunks)
 +            size += (c.length + 4); // 4bytes CRC
 +        byte[] toRead = new byte[size];
 +
 +        try (RandomAccessFile f = new RandomAccessFile(tmp, "r"))
 +        {
 +            int pos = 0;
 +            for (CompressionMetadata.Chunk c : chunks)
 +            {
 +                f.seek(c.offset);
 +                pos += f.read(toRead, pos, c.length + 4);
 +            }
 +        }
 +
 +        if (testTruncate)
 +        {
 +            byte [] actuallyRead = new byte[50];
 +            System.arraycopy(toRead, 0, actuallyRead, 0, 50);
 +            toRead = actuallyRead;
 +        }
 +
 +        // read buffer using CompressedInputStream
 +        CompressionInfo info = new CompressionInfo(chunks, param);
 +        CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info,
 +                                                                ChecksumType.CRC32, () -> 1.0);
 +
 +        try (DataInputStream in = new DataInputStream(input))
 +        {
 +            for (int i = 0; i < sections.size(); i++)
 +            {
 +                input.position(sections.get(i).left);
 +                long readValue = in.readLong();
 +                assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
 +            }
 +        }
 +    }
 +}


[02/10] cassandra git commit: Fix error streaming section more than 2GB

Posted by yu...@apache.org.
Fix error streaming section more than 2GB

patch by Paulo Motta; reviewed by yukim for CASSANDRA-10961


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/582bdba4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/582bdba4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/582bdba4

Branch: refs/heads/cassandra-3.0
Commit: 582bdba4b201e6ab8e2a9a05cff3566f1bab9dce
Parents: c7f0032
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Jan 13 12:51:17 2016 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jan 13 13:00:11 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/db/composites/AbstractCType.java  |  3 +-
 .../cassandra/streaming/ConnectionHandler.java  |  2 +-
 .../cassandra/streaming/StreamReader.java       | 26 ++++++++---
 .../cassandra/streaming/StreamWriter.java       | 10 ++++-
 .../compress/CompressedInputStream.java         | 45 +++++++------------
 .../compress/CompressedStreamReader.java        | 32 ++++++++++----
 .../compress/CompressedStreamWriter.java        | 16 ++++++-
 .../compress/CompressedInputStreamTest.java     | 46 --------------------
 9 files changed, 86 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1554cf5..11f2529 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.5
+ * Fix error streaming section more than 2GB (CASSANDRA-10961)
  * (cqlsh) Also apply --connect-timeout to control connection
    timeout (CASSANDRA-10959)
  * Histogram buckets exposed in jmx are sorted incorrectly (CASSANDRA-10975)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/db/composites/AbstractCType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/AbstractCType.java b/src/java/org/apache/cassandra/db/composites/AbstractCType.java
index a982280..2190c69 100644
--- a/src/java/org/apache/cassandra/db/composites/AbstractCType.java
+++ b/src/java/org/apache/cassandra/db/composites/AbstractCType.java
@@ -359,7 +359,8 @@ public abstract class AbstractCType implements CType
     protected static void checkRemaining(ByteBuffer bb, int offs, int length)
     {
         if (offs + length > bb.limit())
-            throw new IllegalArgumentException("Not enough bytes");
+            throw new IllegalArgumentException(String.format("Not enough bytes. Offset: %d. Length: %d. Buffer size: %d",
+                                                             offs, length, bb.limit()));
     }
 
     private static class Serializer implements CType.Serializer

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 681f61e..1ec7e1c 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -259,11 +259,11 @@ public class ConnectionHandler
                 {
                     // receive message
                     StreamMessage message = StreamMessage.deserialize(in, protocolVersion, session);
+                    logger.debug("[Stream #{}] Received {}", session.planId(), message);
                     // Might be null if there is an error during streaming (see FileMessage.deserialize). It's ok
                     // to ignore here since we'll have asked for a retry.
                     if (message != null)
                     {
-                        logger.debug("[Stream #{}] Received {}", session.planId(), message);
                         session.messageReceived(message);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index fe3b13d..8789720 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -60,6 +60,7 @@ public class StreamReader
     protected final long repairedAt;
     protected final SSTableFormat.Type format;
     protected final int sstableLevel;
+    protected final int fileSeqNum;
 
     protected Descriptor desc;
 
@@ -73,6 +74,7 @@ public class StreamReader
         this.repairedAt = header.repairedAt;
         this.format = header.format;
         this.sstableLevel = header.sstableLevel;
+        this.fileSeqNum = header.sequenceNumber;
     }
 
     /**
@@ -83,33 +85,46 @@ public class StreamReader
     @SuppressWarnings("resource")
     public SSTableWriter read(ReadableByteChannel channel) throws IOException
     {
-        logger.debug("reading file from {}, repairedAt = {}, level = {}", session.peer, repairedAt, sstableLevel);
         long totalSize = totalSize();
 
         Pair<String, String> kscf = Schema.instance.getCF(cfId);
-        if (kscf == null)
+        ColumnFamilyStore cfs = null;
+        if (kscf != null)
+            cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+        if (kscf == null || cfs == null)
         {
             // schema was dropped during streaming
             throw new IOException("CF " + cfId + " was dropped during streaming");
         }
-        ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+        logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.",
+                     session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
+                     cfs.getColumnFamilyName());
 
         DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
         BytesReadTracker in = new BytesReadTracker(dis);
         SSTableWriter writer = null;
+        DecoratedKey key = null;
         try
         {
             writer = createWriter(cfs, totalSize, repairedAt, format);
             while (in.getBytesRead() < totalSize)
             {
-                writeRow(writer, in, cfs);
+                key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
+                writeRow(key, writer, in, cfs);
 
                 // TODO move this to BytesReadTracker
                 session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
             }
+            logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}",
+                         session.planId(), fileSeqNum, session.peer, in.getBytesRead(), totalSize);
             return writer;
         } catch (Throwable e)
         {
+            if (key != null)
+                logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
+                            session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName());
             if (writer != null)
             {
                 try
@@ -167,9 +182,8 @@ public class StreamReader
         return size;
     }
 
-    protected void writeRow(SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws IOException
+    protected void writeRow(DecoratedKey key, SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws IOException
     {
-        DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
         writer.appendFromStream(key, cfs.metadata, in, inputVersion);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java
index 106677c..721ae1e 100644
--- a/src/java/org/apache/cassandra/streaming/StreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java
@@ -22,6 +22,9 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collection;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.ning.compress.lzf.LZFOutputStream;
 
 import org.apache.cassandra.io.sstable.Component;
@@ -41,6 +44,8 @@ public class StreamWriter
 {
     private static final int DEFAULT_CHUNK_SIZE = 64 * 1024;
 
+    private static final Logger logger = LoggerFactory.getLogger(StreamWriter.class);
+
     protected final SSTableReader sstable;
     protected final Collection<Pair<Long, Long>> sections;
     protected final StreamRateLimiter limiter;
@@ -70,7 +75,8 @@ public class StreamWriter
     public void write(DataOutputStreamPlus output) throws IOException
     {
         long totalSize = totalSize();
-
+        logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(),
+                     sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize);
 
         try(RandomAccessReader file = sstable.openDataReader();
             ChecksumValidator validator = new File(sstable.descriptor.filenameFor(Component.CRC)).exists()
@@ -109,6 +115,8 @@ public class StreamWriter
                 // make sure that current section is sent
                 compressedOutput.flush();
             }
+            logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}",
+                         session.planId(), sstable.getFilename(), session.peer, progress, totalSize);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index daa339a..489fed9 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -30,6 +30,9 @@ import java.util.zip.Checksum;
 import com.google.common.collect.Iterators;
 import com.google.common.primitives.Ints;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.utils.WrappedRunnable;
 
@@ -38,12 +41,15 @@ import org.apache.cassandra.utils.WrappedRunnable;
  */
 public class CompressedInputStream extends InputStream
 {
+
+    private static final Logger logger = LoggerFactory.getLogger(CompressedInputStream.class);
+
     private final CompressionInfo info;
     // chunk buffer
     private final BlockingQueue<byte[]> dataBuffer;
 
     // uncompressed bytes
-    private final byte[] buffer;
+    private byte[] buffer;
 
     // offset from the beginning of the buffer
     protected long bufferOffset = 0;
@@ -61,8 +67,6 @@ public class CompressedInputStream extends InputStream
 
     private long totalCompressedBytesRead;
 
-    private Thread readerThread;
-
     /**
      * @param source Input source to read compressed data from
      * @param info Compression info
@@ -73,10 +77,9 @@ public class CompressedInputStream extends InputStream
         this.checksum =  new Adler32();
         this.buffer = new byte[info.parameters.chunkLength()];
         // buffer is limited to store up to 1024 chunks
-        this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
+        this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
 
-        readerThread = new Thread(new Reader(source, info, dataBuffer));
-        readerThread.start();
+        new Thread(new Reader(source, info, dataBuffer)).start();
     }
 
     public int read() throws IOException
@@ -135,7 +138,7 @@ public class CompressedInputStream extends InputStream
         return totalCompressedBytesRead;
     }
 
-    class Reader extends WrappedRunnable
+    static class Reader extends WrappedRunnable
     {
         private final InputStream source;
         private final Iterator<CompressionMetadata.Chunk> chunks;
@@ -151,7 +154,7 @@ public class CompressedInputStream extends InputStream
         protected void runMayThrow() throws Exception
         {
             byte[] compressedWithCRC;
-            while (!Thread.currentThread().isInterrupted() && chunks.hasNext())
+            while (chunks.hasNext())
             {
                 CompressionMetadata.Chunk chunk = chunks.next();
 
@@ -161,43 +164,25 @@ public class CompressedInputStream extends InputStream
                 int bufferRead = 0;
                 while (bufferRead < readLength)
                 {
-                    int r;
                     try
                     {
-                        r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
+                        int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
                         if (r < 0)
                         {
                             dataBuffer.put(POISON_PILL);
                             return; // throw exception where we consume dataBuffer
                         }
+                        bufferRead += r;
                     }
                     catch (IOException e)
                     {
+                        logger.warn("Error while reading compressed input stream.", e);
                         dataBuffer.put(POISON_PILL);
-                        throw e;
+                        return; // throw exception where we consume dataBuffer
                     }
-                    bufferRead += r;
                 }
                 dataBuffer.put(compressedWithCRC);
             }
-            synchronized(CompressedInputStream.this)
-            {
-                readerThread = null;
-            }
-        }
-    }
-
-    @Override
-    public void close() throws IOException
-    {
-        synchronized(this)
-        {
-            if (readerThread != null)
-            {
-                readerThread.interrupt();
-                readerThread = null;
-            }
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 2277943..c684e4f 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -25,6 +25,7 @@ import java.nio.channels.ReadableByteChannel;
 
 import com.google.common.base.Throwables;
 
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 
 import org.slf4j.Logger;
@@ -34,10 +35,12 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.StreamReader;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.messages.FileMessageHeader;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.BytesReadTracker;
 import org.apache.cassandra.utils.Pair;
 
@@ -64,44 +67,59 @@ public class CompressedStreamReader extends StreamReader
     @SuppressWarnings("resource")
     public SSTableWriter read(ReadableByteChannel channel) throws IOException
     {
-        logger.debug("reading file from {}, repairedAt = {}", session.peer, repairedAt);
         long totalSize = totalSize();
 
         Pair<String, String> kscf = Schema.instance.getCF(cfId);
-        if (kscf == null)
+        ColumnFamilyStore cfs = null;
+        if (kscf != null)
+            cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+        if (kscf == null || cfs == null)
         {
             // schema was dropped during streaming
             throw new IOException("CF " + cfId + " was dropped during streaming");
         }
-        ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+        logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.",
+                     session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
+                     cfs.getColumnFamilyName());
 
         CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
         BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
         SSTableWriter writer = null;
+        DecoratedKey key = null;
         try
         {
             writer = createWriter(cfs, totalSize, repairedAt, format);
+            int sectionIdx = 0;
             for (Pair<Long, Long> section : sections)
             {
                 assert cis.getTotalCompressedBytesRead() <= totalSize;
-                int sectionLength = (int) (section.right - section.left);
+                long sectionLength = section.right - section.left;
 
+                logger.trace("[Stream #{}] Reading section {} with length {} from stream.", session.planId(), sectionIdx++, sectionLength);
                 // skip to beginning of section inside chunk
                 cis.position(section.left);
                 in.reset(0);
 
                 while (in.getBytesRead() < sectionLength)
                 {
-                    writeRow(writer, in, cfs);
+                    key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
+                    writeRow(key, writer, in, cfs);
 
                     // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred
                     session.progress(desc, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
                 }
             }
+            logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", session.planId(), fileSeqNum,
+                         session.peer, cis.getTotalCompressedBytesRead(), totalSize);
             return writer;
         }
         catch (Throwable e)
         {
+            if (key != null)
+                logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
+                            session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName());
             if (writer != null)
             {
                 try
@@ -120,10 +138,6 @@ public class CompressedStreamReader extends StreamReader
             else
                 throw Throwables.propagate(e);
         }
-        finally
-        {
-            cis.close();
-        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index 144980c..99e9bd6 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@ -25,11 +25,13 @@ import java.util.List;
 
 import com.google.common.base.Function;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
-import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.StreamSession;
@@ -43,6 +45,8 @@ public class CompressedStreamWriter extends StreamWriter
 {
     public static final int CHUNK_SIZE = 10 * 1024 * 1024;
 
+    private static final Logger logger = LoggerFactory.getLogger(CompressedStreamWriter.class);
+
     private final CompressionInfo compressionInfo;
 
     public CompressedStreamWriter(SSTableReader sstable, Collection<Pair<Long, Long>> sections, CompressionInfo compressionInfo, StreamSession session)
@@ -55,16 +59,24 @@ public class CompressedStreamWriter extends StreamWriter
     public void write(DataOutputStreamPlus out) throws IOException
     {
         long totalSize = totalSize();
+        logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(),
+                     sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize);
         try (RandomAccessReader file = sstable.openDataReader(); final ChannelProxy fc = file.getChannel())
         {
             long progress = 0L;
             // calculate chunks to transfer. we want to send continuous chunks altogether.
             List<Pair<Long, Long>> sections = getTransferSections(compressionInfo.chunks);
+
+            int sectionIdx = 0;
+
             // stream each of the required sections of the file
             for (final Pair<Long, Long> section : sections)
             {
                 // length of the section to stream
                 long length = section.right - section.left;
+
+                logger.trace("[Stream #{}] Writing section {} with length {} to stream.", session.planId(), sectionIdx++, length);
+
                 // tracks write progress
                 long bytesTransferred = 0;
                 while (bytesTransferred < length)
@@ -84,6 +96,8 @@ public class CompressedStreamWriter extends StreamWriter
                     session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize);
                 }
             }
+            logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}",
+                         session.planId(), sstable.getFilename(), session.peer, progress, totalSize);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index e692441..0becd18 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -58,52 +58,6 @@ public class CompressedInputStreamTest
     }
 
     /**
-     * Test CompressedInputStream not hang when closed while reading
-     * @throws IOException
-     */
-    @Test(expected = EOFException.class)
-    public void testClose() throws IOException
-    {
-        CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
-        CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
-        final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
-        InputStream blockingInput = new InputStream()
-        {
-            @Override
-            public int read() throws IOException
-            {
-                try
-                {
-                    // 10 second cut off not to stop other test in case
-                    return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
-                }
-                catch (InterruptedException e)
-                {
-                    throw new IOException("Interrupted as expected", e);
-                }
-            }
-        };
-        CompressionInfo info = new CompressionInfo(chunks, param);
-        try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info))
-        {
-            new Thread(new Runnable()
-            {
-                @Override
-                public void run()
-                {
-                    try
-                    {
-                        cis.close();
-                    }
-                    catch (Exception ignore) {}
-                }
-            }).start();
-            // block here
-            cis.read();
-        }
-    }
-
-    /**
      * @param valuesToCheck array of longs of range(0-999)
      * @throws Exception
      */


[04/10] cassandra git commit: Fix error streaming section more than 2GB

Posted by yu...@apache.org.
Fix error streaming section more than 2GB

patch by Paulo Motta; reviewed by yukim for CASSANDRA-10961


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/582bdba4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/582bdba4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/582bdba4

Branch: refs/heads/trunk
Commit: 582bdba4b201e6ab8e2a9a05cff3566f1bab9dce
Parents: c7f0032
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Jan 13 12:51:17 2016 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jan 13 13:00:11 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/db/composites/AbstractCType.java  |  3 +-
 .../cassandra/streaming/ConnectionHandler.java  |  2 +-
 .../cassandra/streaming/StreamReader.java       | 26 ++++++++---
 .../cassandra/streaming/StreamWriter.java       | 10 ++++-
 .../compress/CompressedInputStream.java         | 45 +++++++------------
 .../compress/CompressedStreamReader.java        | 32 ++++++++++----
 .../compress/CompressedStreamWriter.java        | 16 ++++++-
 .../compress/CompressedInputStreamTest.java     | 46 --------------------
 9 files changed, 86 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1554cf5..11f2529 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.5
+ * Fix error streaming section more than 2GB (CASSANDRA-10961)
  * (cqlsh) Also apply --connect-timeout to control connection
    timeout (CASSANDRA-10959)
  * Histogram buckets exposed in jmx are sorted incorrectly (CASSANDRA-10975)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/db/composites/AbstractCType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/AbstractCType.java b/src/java/org/apache/cassandra/db/composites/AbstractCType.java
index a982280..2190c69 100644
--- a/src/java/org/apache/cassandra/db/composites/AbstractCType.java
+++ b/src/java/org/apache/cassandra/db/composites/AbstractCType.java
@@ -359,7 +359,8 @@ public abstract class AbstractCType implements CType
     protected static void checkRemaining(ByteBuffer bb, int offs, int length)
     {
         if (offs + length > bb.limit())
-            throw new IllegalArgumentException("Not enough bytes");
+            throw new IllegalArgumentException(String.format("Not enough bytes. Offset: %d. Length: %d. Buffer size: %d",
+                                                             offs, length, bb.limit()));
     }
 
     private static class Serializer implements CType.Serializer

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 681f61e..1ec7e1c 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -259,11 +259,11 @@ public class ConnectionHandler
                 {
                     // receive message
                     StreamMessage message = StreamMessage.deserialize(in, protocolVersion, session);
+                    logger.debug("[Stream #{}] Received {}", session.planId(), message);
                     // Might be null if there is an error during streaming (see FileMessage.deserialize). It's ok
                     // to ignore here since we'll have asked for a retry.
                     if (message != null)
                     {
-                        logger.debug("[Stream #{}] Received {}", session.planId(), message);
                         session.messageReceived(message);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index fe3b13d..8789720 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -60,6 +60,7 @@ public class StreamReader
     protected final long repairedAt;
     protected final SSTableFormat.Type format;
     protected final int sstableLevel;
+    protected final int fileSeqNum;
 
     protected Descriptor desc;
 
@@ -73,6 +74,7 @@ public class StreamReader
         this.repairedAt = header.repairedAt;
         this.format = header.format;
         this.sstableLevel = header.sstableLevel;
+        this.fileSeqNum = header.sequenceNumber;
     }
 
     /**
@@ -83,33 +85,46 @@ public class StreamReader
     @SuppressWarnings("resource")
     public SSTableWriter read(ReadableByteChannel channel) throws IOException
     {
-        logger.debug("reading file from {}, repairedAt = {}, level = {}", session.peer, repairedAt, sstableLevel);
         long totalSize = totalSize();
 
         Pair<String, String> kscf = Schema.instance.getCF(cfId);
-        if (kscf == null)
+        ColumnFamilyStore cfs = null;
+        if (kscf != null)
+            cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+        if (kscf == null || cfs == null)
         {
             // schema was dropped during streaming
             throw new IOException("CF " + cfId + " was dropped during streaming");
         }
-        ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+        logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.",
+                     session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
+                     cfs.getColumnFamilyName());
 
         DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
         BytesReadTracker in = new BytesReadTracker(dis);
         SSTableWriter writer = null;
+        DecoratedKey key = null;
         try
         {
             writer = createWriter(cfs, totalSize, repairedAt, format);
             while (in.getBytesRead() < totalSize)
             {
-                writeRow(writer, in, cfs);
+                key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
+                writeRow(key, writer, in, cfs);
 
                 // TODO move this to BytesReadTracker
                 session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
             }
+            logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}",
+                         session.planId(), fileSeqNum, session.peer, in.getBytesRead(), totalSize);
             return writer;
         } catch (Throwable e)
         {
+            if (key != null)
+                logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
+                            session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName());
             if (writer != null)
             {
                 try
@@ -167,9 +182,8 @@ public class StreamReader
         return size;
     }
 
-    protected void writeRow(SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws IOException
+    protected void writeRow(DecoratedKey key, SSTableWriter writer, DataInput in, ColumnFamilyStore cfs) throws IOException
     {
-        DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
         writer.appendFromStream(key, cfs.metadata, in, inputVersion);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java
index 106677c..721ae1e 100644
--- a/src/java/org/apache/cassandra/streaming/StreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java
@@ -22,6 +22,9 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collection;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.ning.compress.lzf.LZFOutputStream;
 
 import org.apache.cassandra.io.sstable.Component;
@@ -41,6 +44,8 @@ public class StreamWriter
 {
     private static final int DEFAULT_CHUNK_SIZE = 64 * 1024;
 
+    private static final Logger logger = LoggerFactory.getLogger(StreamWriter.class);
+
     protected final SSTableReader sstable;
     protected final Collection<Pair<Long, Long>> sections;
     protected final StreamRateLimiter limiter;
@@ -70,7 +75,8 @@ public class StreamWriter
     public void write(DataOutputStreamPlus output) throws IOException
     {
         long totalSize = totalSize();
-
+        logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(),
+                     sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize);
 
         try(RandomAccessReader file = sstable.openDataReader();
             ChecksumValidator validator = new File(sstable.descriptor.filenameFor(Component.CRC)).exists()
@@ -109,6 +115,8 @@ public class StreamWriter
                 // make sure that current section is sent
                 compressedOutput.flush();
             }
+            logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}",
+                         session.planId(), sstable.getFilename(), session.peer, progress, totalSize);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index daa339a..489fed9 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -30,6 +30,9 @@ import java.util.zip.Checksum;
 import com.google.common.collect.Iterators;
 import com.google.common.primitives.Ints;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.utils.WrappedRunnable;
 
@@ -38,12 +41,15 @@ import org.apache.cassandra.utils.WrappedRunnable;
  */
 public class CompressedInputStream extends InputStream
 {
+
+    private static final Logger logger = LoggerFactory.getLogger(CompressedInputStream.class);
+
     private final CompressionInfo info;
     // chunk buffer
     private final BlockingQueue<byte[]> dataBuffer;
 
     // uncompressed bytes
-    private final byte[] buffer;
+    private byte[] buffer;
 
     // offset from the beginning of the buffer
     protected long bufferOffset = 0;
@@ -61,8 +67,6 @@ public class CompressedInputStream extends InputStream
 
     private long totalCompressedBytesRead;
 
-    private Thread readerThread;
-
     /**
      * @param source Input source to read compressed data from
      * @param info Compression info
@@ -73,10 +77,9 @@ public class CompressedInputStream extends InputStream
         this.checksum =  new Adler32();
         this.buffer = new byte[info.parameters.chunkLength()];
         // buffer is limited to store up to 1024 chunks
-        this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
+        this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
 
-        readerThread = new Thread(new Reader(source, info, dataBuffer));
-        readerThread.start();
+        new Thread(new Reader(source, info, dataBuffer)).start();
     }
 
     public int read() throws IOException
@@ -135,7 +138,7 @@ public class CompressedInputStream extends InputStream
         return totalCompressedBytesRead;
     }
 
-    class Reader extends WrappedRunnable
+    static class Reader extends WrappedRunnable
     {
         private final InputStream source;
         private final Iterator<CompressionMetadata.Chunk> chunks;
@@ -151,7 +154,7 @@ public class CompressedInputStream extends InputStream
         protected void runMayThrow() throws Exception
         {
             byte[] compressedWithCRC;
-            while (!Thread.currentThread().isInterrupted() && chunks.hasNext())
+            while (chunks.hasNext())
             {
                 CompressionMetadata.Chunk chunk = chunks.next();
 
@@ -161,43 +164,25 @@ public class CompressedInputStream extends InputStream
                 int bufferRead = 0;
                 while (bufferRead < readLength)
                 {
-                    int r;
                     try
                     {
-                        r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
+                        int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
                         if (r < 0)
                         {
                             dataBuffer.put(POISON_PILL);
                             return; // throw exception where we consume dataBuffer
                         }
+                        bufferRead += r;
                     }
                     catch (IOException e)
                     {
+                        logger.warn("Error while reading compressed input stream.", e);
                         dataBuffer.put(POISON_PILL);
-                        throw e;
+                        return; // throw exception where we consume dataBuffer
                     }
-                    bufferRead += r;
                 }
                 dataBuffer.put(compressedWithCRC);
             }
-            synchronized(CompressedInputStream.this)
-            {
-                readerThread = null;
-            }
-        }
-    }
-
-    @Override
-    public void close() throws IOException
-    {
-        synchronized(this)
-        {
-            if (readerThread != null)
-            {
-                readerThread.interrupt();
-                readerThread = null;
-            }
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 2277943..c684e4f 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -25,6 +25,7 @@ import java.nio.channels.ReadableByteChannel;
 
 import com.google.common.base.Throwables;
 
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 
 import org.slf4j.Logger;
@@ -34,10 +35,12 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.StreamReader;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.messages.FileMessageHeader;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.BytesReadTracker;
 import org.apache.cassandra.utils.Pair;
 
@@ -64,44 +67,59 @@ public class CompressedStreamReader extends StreamReader
     @SuppressWarnings("resource")
     public SSTableWriter read(ReadableByteChannel channel) throws IOException
     {
-        logger.debug("reading file from {}, repairedAt = {}", session.peer, repairedAt);
         long totalSize = totalSize();
 
         Pair<String, String> kscf = Schema.instance.getCF(cfId);
-        if (kscf == null)
+        ColumnFamilyStore cfs = null;
+        if (kscf != null)
+            cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+        if (kscf == null || cfs == null)
         {
             // schema was dropped during streaming
             throw new IOException("CF " + cfId + " was dropped during streaming");
         }
-        ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+
+        logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.",
+                     session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
+                     cfs.getColumnFamilyName());
 
         CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
         BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
         SSTableWriter writer = null;
+        DecoratedKey key = null;
         try
         {
             writer = createWriter(cfs, totalSize, repairedAt, format);
+            int sectionIdx = 0;
             for (Pair<Long, Long> section : sections)
             {
                 assert cis.getTotalCompressedBytesRead() <= totalSize;
-                int sectionLength = (int) (section.right - section.left);
+                long sectionLength = section.right - section.left;
 
+                logger.trace("[Stream #{}] Reading section {} with length {} from stream.", session.planId(), sectionIdx++, sectionLength);
                 // skip to beginning of section inside chunk
                 cis.position(section.left);
                 in.reset(0);
 
                 while (in.getBytesRead() < sectionLength)
                 {
-                    writeRow(writer, in, cfs);
+                    key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
+                    writeRow(key, writer, in, cfs);
 
                     // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred
                     session.progress(desc, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
                 }
             }
+            logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", session.planId(), fileSeqNum,
+                         session.peer, cis.getTotalCompressedBytesRead(), totalSize);
             return writer;
         }
         catch (Throwable e)
         {
+            if (key != null)
+                logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
+                            session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName());
             if (writer != null)
             {
                 try
@@ -120,10 +138,6 @@ public class CompressedStreamReader extends StreamReader
             else
                 throw Throwables.propagate(e);
         }
-        finally
-        {
-            cis.close();
-        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index 144980c..99e9bd6 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@ -25,11 +25,13 @@ import java.util.List;
 
 import com.google.common.base.Function;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
-import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.StreamSession;
@@ -43,6 +45,8 @@ public class CompressedStreamWriter extends StreamWriter
 {
     public static final int CHUNK_SIZE = 10 * 1024 * 1024;
 
+    private static final Logger logger = LoggerFactory.getLogger(CompressedStreamWriter.class);
+
     private final CompressionInfo compressionInfo;
 
     public CompressedStreamWriter(SSTableReader sstable, Collection<Pair<Long, Long>> sections, CompressionInfo compressionInfo, StreamSession session)
@@ -55,16 +59,24 @@ public class CompressedStreamWriter extends StreamWriter
     public void write(DataOutputStreamPlus out) throws IOException
     {
         long totalSize = totalSize();
+        logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(),
+                     sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize);
         try (RandomAccessReader file = sstable.openDataReader(); final ChannelProxy fc = file.getChannel())
         {
             long progress = 0L;
             // calculate chunks to transfer. we want to send continuous chunks altogether.
             List<Pair<Long, Long>> sections = getTransferSections(compressionInfo.chunks);
+
+            int sectionIdx = 0;
+
             // stream each of the required sections of the file
             for (final Pair<Long, Long> section : sections)
             {
                 // length of the section to stream
                 long length = section.right - section.left;
+
+                logger.trace("[Stream #{}] Writing section {} with length {} to stream.", session.planId(), sectionIdx++, length);
+
                 // tracks write progress
                 long bytesTransferred = 0;
                 while (bytesTransferred < length)
@@ -84,6 +96,8 @@ public class CompressedStreamWriter extends StreamWriter
                     session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize);
                 }
             }
+            logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}",
+                         session.planId(), sstable.getFilename(), session.peer, progress, totalSize);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/582bdba4/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index e692441..0becd18 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -58,52 +58,6 @@ public class CompressedInputStreamTest
     }
 
     /**
-     * Test CompressedInputStream not hang when closed while reading
-     * @throws IOException
-     */
-    @Test(expected = EOFException.class)
-    public void testClose() throws IOException
-    {
-        CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
-        CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
-        final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
-        InputStream blockingInput = new InputStream()
-        {
-            @Override
-            public int read() throws IOException
-            {
-                try
-                {
-                    // 10 second cut off not to stop other test in case
-                    return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
-                }
-                catch (InterruptedException e)
-                {
-                    throw new IOException("Interrupted as expected", e);
-                }
-            }
-        };
-        CompressionInfo info = new CompressionInfo(chunks, param);
-        try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info))
-        {
-            new Thread(new Runnable()
-            {
-                @Override
-                public void run()
-                {
-                    try
-                    {
-                        cis.close();
-                    }
-                    catch (Exception ignore) {}
-                }
-            }).start();
-            // block here
-            cis.read();
-        }
-    }
-
-    /**
      * @param valuesToCheck array of longs of range(0-999)
      * @throws Exception
      */


[06/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0f995a2d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0f995a2d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0f995a2d

Branch: refs/heads/cassandra-3.0
Commit: 0f995a2dc7a116ec6def110e10af6bb9acc9f7b3
Parents: 95012da 582bdba
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jan 13 13:13:11 2016 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jan 13 13:13:11 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/ConnectionHandler.java  |  2 +-
 .../cassandra/streaming/StreamReader.java       | 20 +++++++--
 .../cassandra/streaming/StreamWriter.java       | 10 ++++-
 .../compress/CompressedInputStream.java         | 41 ++++++-----------
 .../compress/CompressedStreamReader.java        | 26 +++++++----
 .../compress/CompressedStreamWriter.java        | 15 +++++++
 .../compression/CompressedInputStreamTest.java  | 46 --------------------
 8 files changed, 73 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 38786c1,11f2529..614d5b4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,20 -1,5 +1,21 @@@
 -2.2.5
 +3.0.3
 + * Fix AssertionError when removing from list using UPDATE (CASSANDRA-10954)
 + * Fix UnsupportedOperationException when reading old sstable with range
 +   tombstone (CASSANDRA-10743)
 + * MV should use the maximum timestamp of the primary key (CASSANDRA-10910)
 + * Fix potential assertion error during compaction (CASSANDRA-10944)
 + * Fix counting of received sstables in streaming (CASSANDRA-10949)
 + * Implement hints compression (CASSANDRA-9428)
 + * Fix potential assertion error when reading static columns (CASSANDRA-10903)
 + * Avoid NoSuchElementException when executing empty batch (CASSANDRA-10711)
 + * Avoid building PartitionUpdate in toString (CASSANDRA-10897)
 + * Reduce heap spent when receiving many SSTables (CASSANDRA-10797)
 + * Add back support for 3rd party auth providers to bulk loader (CASSANDRA-10873)
 + * Eliminate the dependency on jgrapht for UDT resolution (CASSANDRA-10653)
 + * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-10837)
 + * Fix sstableloader not working with upper case keyspace name (CASSANDRA-10806)
 +Merged from 2.2:
+  * Fix error streaming section more than 2GB (CASSANDRA-10961)
   * (cqlsh) Also apply --connect-timeout to control connection
     timeout (CASSANDRA-10959)
   * Histogram buckets exposed in jmx are sorted incorrectly (CASSANDRA-10975)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 87dcda0,8789720..268f974
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -61,7 -60,7 +61,8 @@@ public class StreamReade
      protected final long repairedAt;
      protected final SSTableFormat.Type format;
      protected final int sstableLevel;
 +    protected final SerializationHeader.Component header;
+     protected final int fileSeqNum;
  
      protected Descriptor desc;
  
@@@ -75,7 -74,7 +76,8 @@@
          this.repairedAt = header.repairedAt;
          this.format = header.format;
          this.sstableLevel = header.sstableLevel;
 +        this.header = header.header;
+         this.fileSeqNum = header.sequenceNumber;
      }
  
      /**
@@@ -83,10 -82,9 +85,9 @@@
       * @return SSTable transferred
       * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
       */
 -    @SuppressWarnings("resource")
 -    public SSTableWriter read(ReadableByteChannel channel) throws IOException
 +    @SuppressWarnings("resource") // channel needs to remain open, streams on top of it can't be closed
 +    public SSTableMultiWriter read(ReadableByteChannel channel) throws IOException
      {
-         logger.debug("reading file from {}, repairedAt = {}, level = {}", session.peer, repairedAt, sstableLevel);
          long totalSize = totalSize();
  
          Pair<String, String> kscf = Schema.instance.getCF(cfId);
@@@ -110,13 -117,25 +118,18 @@@
                  // TODO move this to BytesReadTracker
                  session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
              }
+             logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}",
+                          session.planId(), fileSeqNum, session.peer, in.getBytesRead(), totalSize);
              return writer;
 -        } catch (Throwable e)
 +        }
 +        catch (Throwable e)
          {
 -            if (key != null)
++            if (deserializer != null)
+                 logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
 -                            session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName());
++                            session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getColumnFamilyName());
              if (writer != null)
              {
 -                try
 -                {
 -                    writer.abort();
 -                }
 -                catch (Throwable e2)
 -                {
 -                    // add abort error to original and continue so we can drain unread stream
 -                    e.addSuppressed(e2);
 -                }
 +                writer.abort(e);
              }
              drain(dis, in.getBytesRead());
              if (e instanceof IOException)
@@@ -126,16 -145,14 +139,15 @@@
          }
      }
  
 -    protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, SSTableFormat.Type format) throws IOException
 +    protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, SSTableFormat.Type format) throws IOException
      {
 -        Directories.DataDirectory localDir = cfs.directories.getWriteableLocation(totalSize);
 +        Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
          if (localDir == null)
              throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
- 
 -        desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir), format));
 +        desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format));
  
 -        return SSTableWriter.create(desc, estimatedKeys, repairedAt, sstableLevel);
 +
 +        return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), session.getTransaction(cfId));
      }
  
      protected void drain(InputStream dis, long bytesRead) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 56dc63a,489fed9..55ac7ac
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@@ -30,8 -30,10 +30,11 @@@ import java.util.zip.Checksum
  import com.google.common.collect.Iterators;
  import com.google.common.primitives.Ints;
  
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
  import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.utils.ChecksumType;
  import org.apache.cassandra.utils.WrappedRunnable;
  
  /**
@@@ -69,17 -71,15 +73,16 @@@ public class CompressedInputStream exte
       * @param source Input source to read compressed data from
       * @param info Compression info
       */
 -    public CompressedInputStream(InputStream source, CompressionInfo info)
 +    public CompressedInputStream(InputStream source, CompressionInfo info, ChecksumType checksumType, Supplier<Double> crcCheckChanceSupplier)
      {
          this.info = info;
 -        this.checksum =  new Adler32();
 +        this.checksum =  checksumType.newInstance();
          this.buffer = new byte[info.parameters.chunkLength()];
          // buffer is limited to store up to 1024 chunks
 -        this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
 +        this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
 +        this.crcCheckChanceSupplier = crcCheckChanceSupplier;
  
-         readerThread = new Thread(new Reader(source, info, dataBuffer));
-         readerThread.start();
+         new Thread(new Reader(source, info, dataBuffer)).start();
      }
  
      public int read() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 4d10244,c684e4f..5210d5b
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@@ -24,7 -25,8 +24,8 @@@ import java.nio.channels.ReadableByteCh
  
  import com.google.common.base.Throwables;
  
+ import org.apache.cassandra.db.DecoratedKey;
 -import org.apache.cassandra.io.sstable.format.SSTableWriter;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
  
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -60,10 -64,9 +61,9 @@@ public class CompressedStreamReader ext
       * @throws java.io.IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
       */
      @Override
 -    @SuppressWarnings("resource")
 -    public SSTableWriter read(ReadableByteChannel channel) throws IOException
 +    @SuppressWarnings("resource") // channel needs to remain open, streams on top of it can't be closed
 +    public SSTableMultiWriter read(ReadableByteChannel channel) throws IOException
      {
-         logger.debug("reading file from {}, repairedAt = {}", session.peer, repairedAt);
          long totalSize = totalSize();
  
          Pair<String, String> kscf = Schema.instance.getCF(cfId);
@@@ -72,13 -79,15 +76,16 @@@
              // schema was dropped during streaming
              throw new IOException("CF " + cfId + " was dropped during streaming");
          }
-         ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+ 
+         logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.",
+                      session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
+                      cfs.getColumnFamilyName());
  
 -        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
 +        CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo,
 +                                                              inputVersion.compressedChecksumType(), cfs::getCrcCheckChance);
          BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
 -        SSTableWriter writer = null;
 -        DecoratedKey key = null;
 +        StreamDeserializer deserializer = new StreamDeserializer(cfs.metadata, in, inputVersion, header.toHeader(cfs.metadata));
 +        SSTableMultiWriter writer = null;
          try
          {
              writer = createWriter(cfs, totalSize, repairedAt, format);
@@@ -102,9 -117,20 +113,12 @@@
          }
          catch (Throwable e)
          {
 -            if (key != null)
++            if (deserializer != null)
+                 logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
 -                            session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName());
++                            session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(), cfs.getColumnFamilyName());
              if (writer != null)
              {
 -                try
 -                {
 -                    writer.abort();
 -                }
 -                catch (Throwable e2)
 -                {
 -                    // add abort error to original and continue so we can drain unread stream
 -                    e.addSuppressed(e2);
 -                }
 +                writer.abort(e);
              }
              drain(cis, in.getBytesRead());
              if (e instanceof IOException)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index adbd091,99e9bd6..f37af29
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@@ -55,7 -59,9 +60,9 @@@ public class CompressedStreamWriter ext
      public void write(DataOutputStreamPlus out) throws IOException
      {
          long totalSize = totalSize();
+         logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(),
+                      sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize);
 -        try (RandomAccessReader file = sstable.openDataReader(); final ChannelProxy fc = file.getChannel())
 +        try (ChannelProxy fc = sstable.getDataChannel().sharedCopy())
          {
              long progress = 0L;
              // calculate chunks to transfer. we want to send continuous chunks altogether.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f995a2d/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
index 2162e32,0000000..a3300ac
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
@@@ -1,183 -1,0 +1,137 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.streaming.compression;
 +
 +import java.io.*;
 +import java.util.*;
 +import java.util.concurrent.SynchronousQueue;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.junit.Test;
 +import org.apache.cassandra.db.ClusteringComparator;
 +import org.apache.cassandra.db.marshal.BytesType;
 +import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 +import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.schema.CompressionParams;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.streaming.compress.CompressedInputStream;
 +import org.apache.cassandra.streaming.compress.CompressionInfo;
 +import org.apache.cassandra.utils.ChecksumType;
 +import org.apache.cassandra.utils.Pair;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +/**
 + */
 +public class CompressedInputStreamTest
 +{
 +    @Test
 +    public void testCompressedRead() throws Exception
 +    {
 +        testCompressedReadWith(new long[]{0L}, false);
 +        testCompressedReadWith(new long[]{1L}, false);
 +        testCompressedReadWith(new long[]{100L}, false);
 +
 +        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false);
 +    }
 +
 +    @Test(expected = EOFException.class)
 +    public void testTruncatedRead() throws Exception
 +    {
 +        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
 +    }
 +
 +    /**
-      * Test CompressedInputStream not hang when closed while reading
-      * @throws IOException
-      */
-     @Test(expected = EOFException.class)
-     public void testClose() throws IOException
-     {
-         CompressionParams param = CompressionParams.snappy(32);
-         CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
-         final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
-         InputStream blockingInput = new InputStream()
-         {
-             @Override
-             public int read() throws IOException
-             {
-                 try
-                 {
-                     // 10 second cut off not to stop other test in case
-                     return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
-                 }
-                 catch (InterruptedException e)
-                 {
-                     throw new IOException("Interrupted as expected", e);
-                 }
-             }
-         };
-         CompressionInfo info = new CompressionInfo(chunks, param);
-         try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, ChecksumType.CRC32, () -> 1.0))
-         {
-             new Thread(new Runnable()
-             {
-                 @Override
-                 public void run()
-                 {
-                     try
-                     {
-                         cis.close();
-                     }
-                     catch (Exception ignore) {}
-                 }
-             }).start();
-             // block here
-             cis.read();
-         }
-     }
- 
-     /**
 +     * @param valuesToCheck array of longs of range(0-999)
 +     * @throws Exception
 +     */
 +    private void testCompressedReadWith(long[] valuesToCheck, boolean testTruncate) throws Exception
 +    {
 +        assert valuesToCheck != null && valuesToCheck.length > 0;
 +
 +        // write compressed data file of longs
 +        File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
 +        Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
 +        MetadataCollector collector = new MetadataCollector(new ClusteringComparator(BytesType.instance));
 +        CompressionParams param = CompressionParams.snappy(32);
 +        Map<Long, Long> index = new HashMap<Long, Long>();
 +        try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
 +        {
 +            for (long l = 0L; l < 1000; l++)
 +            {
 +                index.put(l, writer.position());
 +                writer.writeLong(l);
 +            }
 +            writer.finish();
 +        }
 +
 +        CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
 +        List<Pair<Long, Long>> sections = new ArrayList<>();
 +        for (long l : valuesToCheck)
 +        {
 +            long position = index.get(l);
 +            sections.add(Pair.create(position, position + 8));
 +        }
 +        CompressionMetadata.Chunk[] chunks = comp.getChunksForSections(sections);
 +        long totalSize = comp.getTotalSizeForSections(sections);
 +        long expectedSize = 0;
 +        for (CompressionMetadata.Chunk c : chunks)
 +            expectedSize += c.length + 4;
 +        assertEquals(expectedSize, totalSize);
 +
 +        // buffer up only relevant parts of file
 +        int size = 0;
 +        for (CompressionMetadata.Chunk c : chunks)
 +            size += (c.length + 4); // 4bytes CRC
 +        byte[] toRead = new byte[size];
 +
 +        try (RandomAccessFile f = new RandomAccessFile(tmp, "r"))
 +        {
 +            int pos = 0;
 +            for (CompressionMetadata.Chunk c : chunks)
 +            {
 +                f.seek(c.offset);
 +                pos += f.read(toRead, pos, c.length + 4);
 +            }
 +        }
 +
 +        if (testTruncate)
 +        {
 +            byte [] actuallyRead = new byte[50];
 +            System.arraycopy(toRead, 0, actuallyRead, 0, 50);
 +            toRead = actuallyRead;
 +        }
 +
 +        // read buffer using CompressedInputStream
 +        CompressionInfo info = new CompressionInfo(chunks, param);
 +        CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info,
 +                                                                ChecksumType.CRC32, () -> 1.0);
 +
 +        try (DataInputStream in = new DataInputStream(input))
 +        {
 +            for (int i = 0; i < sections.size(); i++)
 +            {
 +                input.position(sections.get(i).left);
 +                long readValue = in.readLong();
 +                assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
 +            }
 +        }
 +    }
 +}