You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/11/23 21:00:39 UTC

[01/15] cassandra git commit: Fix CompressedInputStream for proper cleanup

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 4a94f75b0 -> 8b9a9161c
  refs/heads/cassandra-2.2 2aa834265 -> 056055feb
  refs/heads/cassandra-3.0 9fe790d75 -> 0b3cfae4e
  refs/heads/cassandra-3.1 e0c945228 -> e8737fda3
  refs/heads/trunk 440366edd -> fa4c17383


Fix CompressedInputStream for proper cleanup

patch by Chris Moos and yukim; reviewed by Paulo Motta for CASSANDRA-10012


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b9a9161
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b9a9161
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b9a9161

Branch: refs/heads/cassandra-2.1
Commit: 8b9a9161caf678bfe2ead7aa970fc6b607372c42
Parents: 4a94f75
Author: Chris Moos <ch...@chrismoos.com>
Authored: Mon Nov 23 12:31:24 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 12:33:09 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compress/CompressedInputStream.java         | 46 +++++++--
 .../compress/CompressedStreamReader.java        |  4 +
 .../compress/CompressedInputStreamTest.java     | 98 +++++++++++++++-----
 4 files changed, 117 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 86e5cb2..c4dd54e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
  * (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
  * Try next replica if not possible to connect to primary replica on
    ColumnFamilyRecordReader (CASSANDRA-2388)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 449546f..b4a3065 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -45,7 +45,7 @@ public class CompressedInputStream extends InputStream
     private final BlockingQueue<byte[]> dataBuffer;
 
     // uncompressed bytes
-    private byte[] buffer;
+    private final byte[] buffer;
 
     // offset from the beginning of the buffer
     protected long bufferOffset = 0;
@@ -64,6 +64,8 @@ public class CompressedInputStream extends InputStream
     private long totalCompressedBytesRead;
     private final boolean hasPostCompressionAdlerChecksums;
 
+    private Thread readerThread;
+
     /**
      * @param source Input source to read compressed data from
      * @param info Compression info
@@ -75,9 +77,10 @@ public class CompressedInputStream extends InputStream
         this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
         this.buffer = new byte[info.parameters.chunkLength()];
         // buffer is limited to store up to 1024 chunks
-        this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+        this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
 
-        new Thread(new Reader(source, info, dataBuffer)).start();
+        readerThread = new Thread(new Reader(source, info, dataBuffer));
+        readerThread.start();
     }
 
     public int read() throws IOException
@@ -143,7 +146,7 @@ public class CompressedInputStream extends InputStream
         return totalCompressedBytesRead;
     }
 
-    static class Reader extends WrappedRunnable
+    class Reader extends WrappedRunnable
     {
         private final InputStream source;
         private final Iterator<CompressionMetadata.Chunk> chunks;
@@ -159,7 +162,7 @@ public class CompressedInputStream extends InputStream
         protected void runMayThrow() throws Exception
         {
             byte[] compressedWithCRC;
-            while (chunks.hasNext())
+            while (!Thread.currentThread().isInterrupted() && chunks.hasNext())
             {
                 CompressionMetadata.Chunk chunk = chunks.next();
 
@@ -169,16 +172,43 @@ public class CompressedInputStream extends InputStream
                 int bufferRead = 0;
                 while (bufferRead < readLength)
                 {
-                    int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
-                    if (r < 0)
+                    int r;
+                    try
+                    {
+                        r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
+                        if (r < 0)
+                        {
+                            dataBuffer.put(POISON_PILL);
+                            return; // throw exception where we consume dataBuffer
+                        }
+                    }
+                    catch (IOException e)
                     {
                         dataBuffer.put(POISON_PILL);
-                        return; // throw exception where we consume dataBuffer
+                        throw e;
                     }
                     bufferRead += r;
                 }
                 dataBuffer.put(compressedWithCRC);
             }
+            synchronized(CompressedInputStream.this)
+            {
+                readerThread = null;
+            }
         }
     }
+
+    @Override
+    public void close() throws IOException
+    {
+        synchronized(this)
+        {
+            if (readerThread != null)
+            {
+                readerThread.interrupt();
+                readerThread = null;
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 0529496..4f60773 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -113,6 +113,10 @@ public class CompressedStreamReader extends StreamReader
             else
                 throw Throwables.propagate(e);
         }
+        finally
+        {
+            cis.close();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index f3007da..87e0003 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -17,12 +17,10 @@
  */
 package org.apache.cassandra.streaming.compress;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.RandomAccessFile;
+import java.io.*;
 import java.util.*;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Test;
 
@@ -58,6 +56,53 @@ public class CompressedInputStreamTest
     {
         testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
     }
+
+    /**
+     * Test CompressedInputStream not hang when closed while reading
+     * @throws Exception
+     */
+    @Test(expected = EOFException.class)
+    public void testClose() throws Exception
+    {
+        CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+        CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
+        final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
+        InputStream blockingInput = new InputStream()
+        {
+            @Override
+            public int read() throws IOException
+            {
+                try
+                {
+                    // 10 second cut off not to stop other test in case
+                    return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
+                }
+                catch (InterruptedException e)
+                {
+                    throw new IOException("Interrupted as expected", e);
+                }
+            }
+        };
+        CompressionInfo info = new CompressionInfo(chunks, param);
+        try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, true))
+        {
+            new Thread(new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    try
+                    {
+                        cis.close();
+                    }
+                    catch (Exception ignore) {}
+                }
+            }).start();
+            // block here
+            cis.read();
+        }
+    }
+
     /**
      * @param valuesToCheck array of longs of range(0-999)
      * @throws Exception
@@ -70,18 +115,20 @@ public class CompressedInputStreamTest
         File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
         Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
         MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
-        CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
-        CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
-        Map<Long, Long> index = new HashMap<Long, Long>();
-        for (long l = 0L; l < 1000; l++)
+        CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+        Map<Long, Long> index = new HashMap<>();
+        try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
         {
-            index.put(l, writer.getFilePointer());
-            writer.stream.writeLong(l);
+            for (long l = 0L; l < 1000; l++)
+            {
+                index.put(l, writer.getFilePointer());
+                writer.stream.writeLong(l);
+            }
+            writer.close();
         }
-        writer.close();
 
         CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
-        List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
+        List<Pair<Long, Long>> sections = new ArrayList<>();
         for (long l : valuesToCheck)
         {
             long position = index.get(l);
@@ -100,14 +147,15 @@ public class CompressedInputStreamTest
             size += (c.length + 4); // 4bytes CRC
         byte[] toRead = new byte[size];
 
-        RandomAccessFile f = new RandomAccessFile(tmp, "r");
-        int pos = 0;
-        for (CompressionMetadata.Chunk c : chunks)
+        try (RandomAccessFile f = new RandomAccessFile(tmp, "r"))
         {
-            f.seek(c.offset);
-            pos += f.read(toRead, pos, c.length + 4);
+            int pos = 0;
+            for (CompressionMetadata.Chunk c : chunks)
+            {
+                f.seek(c.offset);
+                pos += f.read(toRead, pos, c.length + 4);
+            }
         }
-        f.close();
 
         if (testTruncate)
         {
@@ -119,13 +167,15 @@ public class CompressedInputStreamTest
         // read buffer using CompressedInputStream
         CompressionInfo info = new CompressionInfo(chunks, param);
         CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true);
-        DataInputStream in = new DataInputStream(input);
 
-        for (int i = 0; i < sections.size(); i++)
+        try (DataInputStream in = new DataInputStream(input))
         {
-            input.position(sections.get(i).left);
-            long exp = in.readLong();
-            assert exp == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + exp;
+            for (int i = 0; i < sections.size(); i++)
+            {
+                input.position(sections.get(i).left);
+                long readValue = in.readLong();
+                assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
+            }
         }
     }
 }


[13/15] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.1

Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.1


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e8737fda
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e8737fda
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e8737fda

Branch: refs/heads/cassandra-3.1
Commit: e8737fda3aea7ac28fe04c02fa687d2606b8d6c8
Parents: e0c9452 0b3cfae
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 23 13:59:04 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 13:59:04 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compress/CompressedInputStream.java         | 46 ++++++++--
 .../compress/CompressedStreamReader.java        |  4 +
 .../compression/CompressedInputStreamTest.java  | 88 ++++++++++++++++----
 4 files changed, 114 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8737fda/CHANGES.txt
----------------------------------------------------------------------


[06/15] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/056055fe
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/056055fe
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/056055fe

Branch: refs/heads/trunk
Commit: 056055febd55e1c19a6216627b8568e60141b9fa
Parents: 2aa8342 8b9a916
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 23 13:17:39 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 13:17:39 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compress/CompressedInputStream.java         | 46 +++++++--
 .../compress/CompressedStreamReader.java        |  4 +
 .../compress/CompressedInputStreamTest.java     | 98 +++++++++++++++-----
 4 files changed, 117 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 146a0ce,c4dd54e..d11be26
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,17 -1,5 +1,18 @@@
 -2.1.12
 +2.2.4
 + * Don't do anticompaction after subrange repair (CASSANDRA-10422)
 + * Fix SimpleDateType type compatibility (CASSANDRA-10027)
 + * (Hadoop) fix splits calculation (CASSANDRA-10640)
 + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 + * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
 + * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 + * Expose phi values from failure detector via JMX and tweak debug
 +   and trace logging (CASSANDRA-9526)
 + * Fix RangeNamesQueryPager (CASSANDRA-10509)
 + * Deprecate Pig support (CASSANDRA-10542)
 + * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
 +Merged from 2.1:
+  * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
   * (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
   * Try next replica if not possible to connect to primary replica on
     ColumnFamilyRecordReader (CASSANDRA-2388)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 872afcd,b4a3065..daa339a
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@@ -60,20 -62,25 +60,23 @@@ public class CompressedInputStream exte
      private static final byte[] POISON_PILL = new byte[0];
  
      private long totalCompressedBytesRead;
 -    private final boolean hasPostCompressionAdlerChecksums;
  
+     private Thread readerThread;
+ 
      /**
       * @param source Input source to read compressed data from
       * @param info Compression info
       */
 -    public CompressedInputStream(InputStream source, CompressionInfo info, boolean hasPostCompressionAdlerChecksums)
 +    public CompressedInputStream(InputStream source, CompressionInfo info)
      {
          this.info = info;
 -        this.checksum = hasPostCompressionAdlerChecksums ? new Adler32() : new CRC32();
 -        this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
 +        this.checksum =  new Adler32();
          this.buffer = new byte[info.parameters.chunkLength()];
          // buffer is limited to store up to 1024 chunks
-         this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+         this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
  
-         new Thread(new Reader(source, info, dataBuffer)).start();
+         readerThread = new Thread(new Reader(source, info, dataBuffer));
+         readerThread.start();
      }
  
      public int read() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 0214c76,87e0003..e692441
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@@ -58,6 -56,53 +56,53 @@@ public class CompressedInputStreamTes
      {
          testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
      }
+ 
+     /**
+      * Test CompressedInputStream not hang when closed while reading
 -     * @throws Exception
++     * @throws IOException
+      */
+     @Test(expected = EOFException.class)
 -    public void testClose() throws Exception
++    public void testClose() throws IOException
+     {
+         CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+         CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
+         final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
+         InputStream blockingInput = new InputStream()
+         {
+             @Override
+             public int read() throws IOException
+             {
+                 try
+                 {
+                     // 10 second cut off not to stop other test in case
+                     return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
+                 }
+                 catch (InterruptedException e)
+                 {
+                     throw new IOException("Interrupted as expected", e);
+                 }
+             }
+         };
+         CompressionInfo info = new CompressionInfo(chunks, param);
 -        try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, true))
++        try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info))
+         {
+             new Thread(new Runnable()
+             {
+                 @Override
+                 public void run()
+                 {
+                     try
+                     {
+                         cis.close();
+                     }
+                     catch (Exception ignore) {}
+                 }
+             }).start();
+             // block here
+             cis.read();
+         }
+     }
+ 
      /**
       * @param valuesToCheck array of longs of range(0-999)
       * @throws Exception
@@@ -70,18 -115,20 +115,20 @@@
          File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
          Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
          MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
-         CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
-         CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
-         Map<Long, Long> index = new HashMap<Long, Long>();
-         for (long l = 0L; l < 1000; l++)
+         CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+         Map<Long, Long> index = new HashMap<>();
+         try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
          {
-             index.put(l, writer.getFilePointer());
-             writer.stream.writeLong(l);
+             for (long l = 0L; l < 1000; l++)
+             {
+                 index.put(l, writer.getFilePointer());
+                 writer.stream.writeLong(l);
+             }
 -            writer.close();
++            writer.finish();
          }
-         writer.finish();
  
          CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
-         List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
+         List<Pair<Long, Long>> sections = new ArrayList<>();
          for (long l : valuesToCheck)
          {
              long position = index.get(l);
@@@ -118,14 -166,16 +166,16 @@@
  
          // read buffer using CompressedInputStream
          CompressionInfo info = new CompressionInfo(chunks, param);
 -        CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true);
 +        CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info);
-         DataInputStream in = new DataInputStream(input);
  
-         for (int i = 0; i < sections.size(); i++)
+         try (DataInputStream in = new DataInputStream(input))
          {
-             input.position(sections.get(i).left);
-             long readValue = in.readLong();
-             assert readValue == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + readValue;
+             for (int i = 0; i < sections.size(); i++)
+             {
+                 input.position(sections.get(i).left);
+                 long readValue = in.readLong();
+                 assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
+             }
          }
      }
  }


[11/15] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0b3cfae4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0b3cfae4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0b3cfae4

Branch: refs/heads/cassandra-3.1
Commit: 0b3cfae4e619d1ece5ff8afc774eeb52b93166d8
Parents: 9fe790d 056055f
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 23 13:58:58 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 13:58:58 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compress/CompressedInputStream.java         | 46 ++++++++--
 .../compress/CompressedStreamReader.java        |  4 +
 .../compression/CompressedInputStreamTest.java  | 88 ++++++++++++++++----
 4 files changed, 114 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 4182cc1,d11be26..608d8f8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -11,7 -3,16 +11,8 @@@ Merged from 2.2
   * Fix SimpleDateType type compatibility (CASSANDRA-10027)
   * (Hadoop) fix splits calculation (CASSANDRA-10640)
   * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 - * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
 - * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 - * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 - * Expose phi values from failure detector via JMX and tweak debug
 -   and trace logging (CASSANDRA-9526)
 - * Fix RangeNamesQueryPager (CASSANDRA-10509)
 - * Deprecate Pig support (CASSANDRA-10542)
 - * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
  Merged from 2.1:
+  * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
   * (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
   * Try next replica if not possible to connect to primary replica on
     ColumnFamilyRecordReader (CASSANDRA-2388)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index ccd0ac5,daa339a..56dc63a
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@@ -42,10 -41,9 +42,10 @@@ public class CompressedInputStream exte
      private final CompressionInfo info;
      // chunk buffer
      private final BlockingQueue<byte[]> dataBuffer;
 +    private final Supplier<Double> crcCheckChanceSupplier;
  
      // uncompressed bytes
-     private byte[] buffer;
+     private final byte[] buffer;
  
      // offset from the beginning of the buffer
      protected long bufferOffset = 0;
@@@ -67,16 -67,16 +69,17 @@@
       * @param source Input source to read compressed data from
       * @param info Compression info
       */
 -    public CompressedInputStream(InputStream source, CompressionInfo info)
 +    public CompressedInputStream(InputStream source, CompressionInfo info, ChecksumType checksumType, Supplier<Double> crcCheckChanceSupplier)
      {
          this.info = info;
 -        this.checksum =  new Adler32();
 +        this.checksum =  checksumType.newInstance();
          this.buffer = new byte[info.parameters.chunkLength()];
          // buffer is limited to store up to 1024 chunks
-         this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+         this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
 +        this.crcCheckChanceSupplier = crcCheckChanceSupplier;
  
-         new Thread(new Reader(source, info, dataBuffer)).start();
+         readerThread = new Thread(new Reader(source, info, dataBuffer));
+         readerThread.start();
      }
  
      public int read() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
index 5646592,0000000..2162e32
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
@@@ -1,129 -1,0 +1,183 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.streaming.compression;
 +
 +import java.io.*;
 +import java.util.*;
++import java.util.concurrent.SynchronousQueue;
++import java.util.concurrent.TimeUnit;
 +
 +import org.junit.Test;
 +import org.apache.cassandra.db.ClusteringComparator;
 +import org.apache.cassandra.db.marshal.BytesType;
 +import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 +import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.schema.CompressionParams;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.streaming.compress.CompressedInputStream;
 +import org.apache.cassandra.streaming.compress.CompressionInfo;
 +import org.apache.cassandra.utils.ChecksumType;
 +import org.apache.cassandra.utils.Pair;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +/**
 + */
 +public class CompressedInputStreamTest
 +{
 +    @Test
 +    public void testCompressedRead() throws Exception
 +    {
 +        testCompressedReadWith(new long[]{0L}, false);
 +        testCompressedReadWith(new long[]{1L}, false);
 +        testCompressedReadWith(new long[]{100L}, false);
 +
 +        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false);
 +    }
 +
 +    @Test(expected = EOFException.class)
 +    public void testTruncatedRead() throws Exception
 +    {
 +        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
 +    }
++
++    /**
++     * Test CompressedInputStream not hang when closed while reading
++     * @throws IOException
++     */
++    @Test(expected = EOFException.class)
++    public void testClose() throws IOException
++    {
++        CompressionParams param = CompressionParams.snappy(32);
++        CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
++        final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
++        InputStream blockingInput = new InputStream()
++        {
++            @Override
++            public int read() throws IOException
++            {
++                try
++                {
++                    // 10 second cut off not to stop other test in case
++                    return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
++                }
++                catch (InterruptedException e)
++                {
++                    throw new IOException("Interrupted as expected", e);
++                }
++            }
++        };
++        CompressionInfo info = new CompressionInfo(chunks, param);
++        try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, ChecksumType.CRC32, () -> 1.0))
++        {
++            new Thread(new Runnable()
++            {
++                @Override
++                public void run()
++                {
++                    try
++                    {
++                        cis.close();
++                    }
++                    catch (Exception ignore) {}
++                }
++            }).start();
++            // block here
++            cis.read();
++        }
++    }
++
 +    /**
 +     * @param valuesToCheck array of longs of range(0-999)
 +     * @throws Exception
 +     */
 +    private void testCompressedReadWith(long[] valuesToCheck, boolean testTruncate) throws Exception
 +    {
 +        assert valuesToCheck != null && valuesToCheck.length > 0;
 +
 +        // write compressed data file of longs
 +        File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
 +        Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
 +        MetadataCollector collector = new MetadataCollector(new ClusteringComparator(BytesType.instance));
 +        CompressionParams param = CompressionParams.snappy(32);
-         CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
 +        Map<Long, Long> index = new HashMap<Long, Long>();
-         for (long l = 0L; l < 1000; l++)
++        try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
 +        {
-             index.put(l, writer.position());
-             writer.writeLong(l);
++            for (long l = 0L; l < 1000; l++)
++            {
++                index.put(l, writer.position());
++                writer.writeLong(l);
++            }
++            writer.finish();
 +        }
-         writer.finish();
 +
 +        CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
-         List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
++        List<Pair<Long, Long>> sections = new ArrayList<>();
 +        for (long l : valuesToCheck)
 +        {
 +            long position = index.get(l);
 +            sections.add(Pair.create(position, position + 8));
 +        }
 +        CompressionMetadata.Chunk[] chunks = comp.getChunksForSections(sections);
 +        long totalSize = comp.getTotalSizeForSections(sections);
 +        long expectedSize = 0;
 +        for (CompressionMetadata.Chunk c : chunks)
 +            expectedSize += c.length + 4;
 +        assertEquals(expectedSize, totalSize);
 +
 +        // buffer up only relevant parts of file
 +        int size = 0;
 +        for (CompressionMetadata.Chunk c : chunks)
 +            size += (c.length + 4); // 4bytes CRC
 +        byte[] toRead = new byte[size];
 +
-         RandomAccessFile f = new RandomAccessFile(tmp, "r");
-         int pos = 0;
-         for (CompressionMetadata.Chunk c : chunks)
++        try (RandomAccessFile f = new RandomAccessFile(tmp, "r"))
 +        {
-             f.seek(c.offset);
-             pos += f.read(toRead, pos, c.length + 4);
++            int pos = 0;
++            for (CompressionMetadata.Chunk c : chunks)
++            {
++                f.seek(c.offset);
++                pos += f.read(toRead, pos, c.length + 4);
++            }
 +        }
-         f.close();
 +
 +        if (testTruncate)
 +        {
 +            byte [] actuallyRead = new byte[50];
 +            System.arraycopy(toRead, 0, actuallyRead, 0, 50);
 +            toRead = actuallyRead;
 +        }
 +
 +        // read buffer using CompressedInputStream
 +        CompressionInfo info = new CompressionInfo(chunks, param);
 +        CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info,
 +                                                                ChecksumType.CRC32, () -> 1.0);
-         DataInputStream in = new DataInputStream(input);
 +
-         for (int i = 0; i < sections.size(); i++)
++        try (DataInputStream in = new DataInputStream(input))
 +        {
-             input.position(sections.get(i).left);
-             long readValue = in.readLong();
-             assert readValue == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + readValue;
++            for (int i = 0; i < sections.size(); i++)
++            {
++                input.position(sections.get(i).left);
++                long readValue = in.readLong();
++                assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
++            }
 +        }
 +    }
 +}


[15/15] cassandra git commit: Merge branch 'cassandra-3.1' into trunk

Posted by yu...@apache.org.
Merge branch 'cassandra-3.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fa4c1738
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fa4c1738
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fa4c1738

Branch: refs/heads/trunk
Commit: fa4c17383a168d6eb3a84eac92d98dadcd4ba373
Parents: 440366e e8737fd
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 23 14:00:19 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 14:00:19 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compress/CompressedInputStream.java         | 46 ++++++++--
 .../compress/CompressedStreamReader.java        |  4 +
 .../compression/CompressedInputStreamTest.java  | 88 ++++++++++++++++----
 4 files changed, 114 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa4c1738/CHANGES.txt
----------------------------------------------------------------------


[14/15] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.1

Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.1


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e8737fda
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e8737fda
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e8737fda

Branch: refs/heads/trunk
Commit: e8737fda3aea7ac28fe04c02fa687d2606b8d6c8
Parents: e0c9452 0b3cfae
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 23 13:59:04 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 13:59:04 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compress/CompressedInputStream.java         | 46 ++++++++--
 .../compress/CompressedStreamReader.java        |  4 +
 .../compression/CompressedInputStreamTest.java  | 88 ++++++++++++++++----
 4 files changed, 114 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8737fda/CHANGES.txt
----------------------------------------------------------------------


[09/15] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/056055fe
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/056055fe
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/056055fe

Branch: refs/heads/cassandra-3.1
Commit: 056055febd55e1c19a6216627b8568e60141b9fa
Parents: 2aa8342 8b9a916
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 23 13:17:39 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 13:17:39 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compress/CompressedInputStream.java         | 46 +++++++--
 .../compress/CompressedStreamReader.java        |  4 +
 .../compress/CompressedInputStreamTest.java     | 98 +++++++++++++++-----
 4 files changed, 117 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 146a0ce,c4dd54e..d11be26
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,17 -1,5 +1,18 @@@
 -2.1.12
 +2.2.4
 + * Don't do anticompaction after subrange repair (CASSANDRA-10422)
 + * Fix SimpleDateType type compatibility (CASSANDRA-10027)
 + * (Hadoop) fix splits calculation (CASSANDRA-10640)
 + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 + * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
 + * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 + * Expose phi values from failure detector via JMX and tweak debug
 +   and trace logging (CASSANDRA-9526)
 + * Fix RangeNamesQueryPager (CASSANDRA-10509)
 + * Deprecate Pig support (CASSANDRA-10542)
 + * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
 +Merged from 2.1:
+  * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
   * (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
   * Try next replica if not possible to connect to primary replica on
     ColumnFamilyRecordReader (CASSANDRA-2388)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 872afcd,b4a3065..daa339a
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@@ -60,20 -62,25 +60,23 @@@ public class CompressedInputStream exte
      private static final byte[] POISON_PILL = new byte[0];
  
      private long totalCompressedBytesRead;
 -    private final boolean hasPostCompressionAdlerChecksums;
  
+     private Thread readerThread;
+ 
      /**
       * @param source Input source to read compressed data from
       * @param info Compression info
       */
 -    public CompressedInputStream(InputStream source, CompressionInfo info, boolean hasPostCompressionAdlerChecksums)
 +    public CompressedInputStream(InputStream source, CompressionInfo info)
      {
          this.info = info;
 -        this.checksum = hasPostCompressionAdlerChecksums ? new Adler32() : new CRC32();
 -        this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
 +        this.checksum =  new Adler32();
          this.buffer = new byte[info.parameters.chunkLength()];
          // buffer is limited to store up to 1024 chunks
-         this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+         this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
  
-         new Thread(new Reader(source, info, dataBuffer)).start();
+         readerThread = new Thread(new Reader(source, info, dataBuffer));
+         readerThread.start();
      }
  
      public int read() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 0214c76,87e0003..e692441
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@@ -58,6 -56,53 +56,53 @@@ public class CompressedInputStreamTes
      {
          testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
      }
+ 
+     /**
+      * Test CompressedInputStream not hang when closed while reading
 -     * @throws Exception
++     * @throws IOException
+      */
+     @Test(expected = EOFException.class)
 -    public void testClose() throws Exception
++    public void testClose() throws IOException
+     {
+         CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+         CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
+         final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
+         InputStream blockingInput = new InputStream()
+         {
+             @Override
+             public int read() throws IOException
+             {
+                 try
+                 {
+                     // 10 second cut off not to stop other test in case
+                     return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
+                 }
+                 catch (InterruptedException e)
+                 {
+                     throw new IOException("Interrupted as expected", e);
+                 }
+             }
+         };
+         CompressionInfo info = new CompressionInfo(chunks, param);
 -        try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, true))
++        try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info))
+         {
+             new Thread(new Runnable()
+             {
+                 @Override
+                 public void run()
+                 {
+                     try
+                     {
+                         cis.close();
+                     }
+                     catch (Exception ignore) {}
+                 }
+             }).start();
+             // block here
+             cis.read();
+         }
+     }
+ 
      /**
       * @param valuesToCheck array of longs of range(0-999)
       * @throws Exception
@@@ -70,18 -115,20 +115,20 @@@
          File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
          Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
          MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
-         CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
-         CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
-         Map<Long, Long> index = new HashMap<Long, Long>();
-         for (long l = 0L; l < 1000; l++)
+         CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+         Map<Long, Long> index = new HashMap<>();
+         try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
          {
-             index.put(l, writer.getFilePointer());
-             writer.stream.writeLong(l);
+             for (long l = 0L; l < 1000; l++)
+             {
+                 index.put(l, writer.getFilePointer());
+                 writer.stream.writeLong(l);
+             }
 -            writer.close();
++            writer.finish();
          }
-         writer.finish();
  
          CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
-         List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
+         List<Pair<Long, Long>> sections = new ArrayList<>();
          for (long l : valuesToCheck)
          {
              long position = index.get(l);
@@@ -118,14 -166,16 +166,16 @@@
  
          // read buffer using CompressedInputStream
          CompressionInfo info = new CompressionInfo(chunks, param);
 -        CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true);
 +        CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info);
-         DataInputStream in = new DataInputStream(input);
  
-         for (int i = 0; i < sections.size(); i++)
+         try (DataInputStream in = new DataInputStream(input))
          {
-             input.position(sections.get(i).left);
-             long readValue = in.readLong();
-             assert readValue == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + readValue;
+             for (int i = 0; i < sections.size(); i++)
+             {
+                 input.position(sections.get(i).left);
+                 long readValue = in.readLong();
+                 assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
+             }
          }
      }
  }


[05/15] cassandra git commit: Fix CompressedInputStream for proper cleanup

Posted by yu...@apache.org.
Fix CompressedInputStream for proper cleanup

patch by Chris Moos and yukim; reviewed by Paulo Motta for CASSANDRA-10012


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b9a9161
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b9a9161
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b9a9161

Branch: refs/heads/cassandra-3.1
Commit: 8b9a9161caf678bfe2ead7aa970fc6b607372c42
Parents: 4a94f75
Author: Chris Moos <ch...@chrismoos.com>
Authored: Mon Nov 23 12:31:24 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 12:33:09 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compress/CompressedInputStream.java         | 46 +++++++--
 .../compress/CompressedStreamReader.java        |  4 +
 .../compress/CompressedInputStreamTest.java     | 98 +++++++++++++++-----
 4 files changed, 117 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 86e5cb2..c4dd54e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
  * (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
  * Try next replica if not possible to connect to primary replica on
    ColumnFamilyRecordReader (CASSANDRA-2388)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 449546f..b4a3065 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -45,7 +45,7 @@ public class CompressedInputStream extends InputStream
     private final BlockingQueue<byte[]> dataBuffer;
 
     // uncompressed bytes
-    private byte[] buffer;
+    private final byte[] buffer;
 
     // offset from the beginning of the buffer
     protected long bufferOffset = 0;
@@ -64,6 +64,8 @@ public class CompressedInputStream extends InputStream
     private long totalCompressedBytesRead;
     private final boolean hasPostCompressionAdlerChecksums;
 
+    private Thread readerThread;
+
     /**
      * @param source Input source to read compressed data from
      * @param info Compression info
@@ -75,9 +77,10 @@ public class CompressedInputStream extends InputStream
         this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
         this.buffer = new byte[info.parameters.chunkLength()];
         // buffer is limited to store up to 1024 chunks
-        this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+        this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
 
-        new Thread(new Reader(source, info, dataBuffer)).start();
+        readerThread = new Thread(new Reader(source, info, dataBuffer));
+        readerThread.start();
     }
 
     public int read() throws IOException
@@ -143,7 +146,7 @@ public class CompressedInputStream extends InputStream
         return totalCompressedBytesRead;
     }
 
-    static class Reader extends WrappedRunnable
+    class Reader extends WrappedRunnable
     {
         private final InputStream source;
         private final Iterator<CompressionMetadata.Chunk> chunks;
@@ -159,7 +162,7 @@ public class CompressedInputStream extends InputStream
         protected void runMayThrow() throws Exception
         {
             byte[] compressedWithCRC;
-            while (chunks.hasNext())
+            while (!Thread.currentThread().isInterrupted() && chunks.hasNext())
             {
                 CompressionMetadata.Chunk chunk = chunks.next();
 
@@ -169,16 +172,43 @@ public class CompressedInputStream extends InputStream
                 int bufferRead = 0;
                 while (bufferRead < readLength)
                 {
-                    int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
-                    if (r < 0)
+                    int r;
+                    try
+                    {
+                        r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
+                        if (r < 0)
+                        {
+                            dataBuffer.put(POISON_PILL);
+                            return; // throw exception where we consume dataBuffer
+                        }
+                    }
+                    catch (IOException e)
                     {
                         dataBuffer.put(POISON_PILL);
-                        return; // throw exception where we consume dataBuffer
+                        throw e;
                     }
                     bufferRead += r;
                 }
                 dataBuffer.put(compressedWithCRC);
             }
+            synchronized(CompressedInputStream.this)
+            {
+                readerThread = null;
+            }
         }
     }
+
+    @Override
+    public void close() throws IOException
+    {
+        synchronized(this)
+        {
+            if (readerThread != null)
+            {
+                readerThread.interrupt();
+                readerThread = null;
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 0529496..4f60773 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -113,6 +113,10 @@ public class CompressedStreamReader extends StreamReader
             else
                 throw Throwables.propagate(e);
         }
+        finally
+        {
+            cis.close();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index f3007da..87e0003 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -17,12 +17,10 @@
  */
 package org.apache.cassandra.streaming.compress;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.RandomAccessFile;
+import java.io.*;
 import java.util.*;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Test;
 
@@ -58,6 +56,53 @@ public class CompressedInputStreamTest
     {
         testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
     }
+
+    /**
+     * Test CompressedInputStream not hang when closed while reading
+     * @throws Exception
+     */
+    @Test(expected = EOFException.class)
+    public void testClose() throws Exception
+    {
+        CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+        CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
+        final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
+        InputStream blockingInput = new InputStream()
+        {
+            @Override
+            public int read() throws IOException
+            {
+                try
+                {
+                    // 10 second cut off not to stop other test in case
+                    return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
+                }
+                catch (InterruptedException e)
+                {
+                    throw new IOException("Interrupted as expected", e);
+                }
+            }
+        };
+        CompressionInfo info = new CompressionInfo(chunks, param);
+        try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, true))
+        {
+            new Thread(new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    try
+                    {
+                        cis.close();
+                    }
+                    catch (Exception ignore) {}
+                }
+            }).start();
+            // block here
+            cis.read();
+        }
+    }
+
     /**
      * @param valuesToCheck array of longs of range(0-999)
      * @throws Exception
@@ -70,18 +115,20 @@ public class CompressedInputStreamTest
         File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
         Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
         MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
-        CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
-        CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
-        Map<Long, Long> index = new HashMap<Long, Long>();
-        for (long l = 0L; l < 1000; l++)
+        CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+        Map<Long, Long> index = new HashMap<>();
+        try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
         {
-            index.put(l, writer.getFilePointer());
-            writer.stream.writeLong(l);
+            for (long l = 0L; l < 1000; l++)
+            {
+                index.put(l, writer.getFilePointer());
+                writer.stream.writeLong(l);
+            }
+            writer.close();
         }
-        writer.close();
 
         CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
-        List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
+        List<Pair<Long, Long>> sections = new ArrayList<>();
         for (long l : valuesToCheck)
         {
             long position = index.get(l);
@@ -100,14 +147,15 @@ public class CompressedInputStreamTest
             size += (c.length + 4); // 4bytes CRC
         byte[] toRead = new byte[size];
 
-        RandomAccessFile f = new RandomAccessFile(tmp, "r");
-        int pos = 0;
-        for (CompressionMetadata.Chunk c : chunks)
+        try (RandomAccessFile f = new RandomAccessFile(tmp, "r"))
         {
-            f.seek(c.offset);
-            pos += f.read(toRead, pos, c.length + 4);
+            int pos = 0;
+            for (CompressionMetadata.Chunk c : chunks)
+            {
+                f.seek(c.offset);
+                pos += f.read(toRead, pos, c.length + 4);
+            }
         }
-        f.close();
 
         if (testTruncate)
         {
@@ -119,13 +167,15 @@ public class CompressedInputStreamTest
         // read buffer using CompressedInputStream
         CompressionInfo info = new CompressionInfo(chunks, param);
         CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true);
-        DataInputStream in = new DataInputStream(input);
 
-        for (int i = 0; i < sections.size(); i++)
+        try (DataInputStream in = new DataInputStream(input))
         {
-            input.position(sections.get(i).left);
-            long exp = in.readLong();
-            assert exp == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + exp;
+            for (int i = 0; i < sections.size(); i++)
+            {
+                input.position(sections.get(i).left);
+                long readValue = in.readLong();
+                assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
+            }
         }
     }
 }


[04/15] cassandra git commit: Fix CompressedInputStream for proper cleanup

Posted by yu...@apache.org.
Fix CompressedInputStream for proper cleanup

patch by Chris Moos and yukim; reviewed by Paulo Motta for CASSANDRA-10012


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b9a9161
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b9a9161
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b9a9161

Branch: refs/heads/cassandra-3.0
Commit: 8b9a9161caf678bfe2ead7aa970fc6b607372c42
Parents: 4a94f75
Author: Chris Moos <ch...@chrismoos.com>
Authored: Mon Nov 23 12:31:24 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 12:33:09 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compress/CompressedInputStream.java         | 46 +++++++--
 .../compress/CompressedStreamReader.java        |  4 +
 .../compress/CompressedInputStreamTest.java     | 98 +++++++++++++++-----
 4 files changed, 117 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 86e5cb2..c4dd54e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
  * (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
  * Try next replica if not possible to connect to primary replica on
    ColumnFamilyRecordReader (CASSANDRA-2388)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 449546f..b4a3065 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -45,7 +45,7 @@ public class CompressedInputStream extends InputStream
     private final BlockingQueue<byte[]> dataBuffer;
 
     // uncompressed bytes
-    private byte[] buffer;
+    private final byte[] buffer;
 
     // offset from the beginning of the buffer
     protected long bufferOffset = 0;
@@ -64,6 +64,8 @@ public class CompressedInputStream extends InputStream
     private long totalCompressedBytesRead;
     private final boolean hasPostCompressionAdlerChecksums;
 
+    private Thread readerThread;
+
     /**
      * @param source Input source to read compressed data from
      * @param info Compression info
@@ -75,9 +77,10 @@ public class CompressedInputStream extends InputStream
         this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
         this.buffer = new byte[info.parameters.chunkLength()];
         // buffer is limited to store up to 1024 chunks
-        this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+        this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
 
-        new Thread(new Reader(source, info, dataBuffer)).start();
+        readerThread = new Thread(new Reader(source, info, dataBuffer));
+        readerThread.start();
     }
 
     public int read() throws IOException
@@ -143,7 +146,7 @@ public class CompressedInputStream extends InputStream
         return totalCompressedBytesRead;
     }
 
-    static class Reader extends WrappedRunnable
+    class Reader extends WrappedRunnable
     {
         private final InputStream source;
         private final Iterator<CompressionMetadata.Chunk> chunks;
@@ -159,7 +162,7 @@ public class CompressedInputStream extends InputStream
         protected void runMayThrow() throws Exception
         {
             byte[] compressedWithCRC;
-            while (chunks.hasNext())
+            while (!Thread.currentThread().isInterrupted() && chunks.hasNext())
             {
                 CompressionMetadata.Chunk chunk = chunks.next();
 
@@ -169,16 +172,43 @@ public class CompressedInputStream extends InputStream
                 int bufferRead = 0;
                 while (bufferRead < readLength)
                 {
-                    int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
-                    if (r < 0)
+                    int r;
+                    try
+                    {
+                        r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
+                        if (r < 0)
+                        {
+                            dataBuffer.put(POISON_PILL);
+                            return; // throw exception where we consume dataBuffer
+                        }
+                    }
+                    catch (IOException e)
                     {
                         dataBuffer.put(POISON_PILL);
-                        return; // throw exception where we consume dataBuffer
+                        throw e;
                     }
                     bufferRead += r;
                 }
                 dataBuffer.put(compressedWithCRC);
             }
+            synchronized(CompressedInputStream.this)
+            {
+                readerThread = null;
+            }
         }
     }
+
+    @Override
+    public void close() throws IOException
+    {
+        synchronized(this)
+        {
+            if (readerThread != null)
+            {
+                readerThread.interrupt();
+                readerThread = null;
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 0529496..4f60773 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -113,6 +113,10 @@ public class CompressedStreamReader extends StreamReader
             else
                 throw Throwables.propagate(e);
         }
+        finally
+        {
+            cis.close();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index f3007da..87e0003 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -17,12 +17,10 @@
  */
 package org.apache.cassandra.streaming.compress;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.RandomAccessFile;
+import java.io.*;
 import java.util.*;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Test;
 
@@ -58,6 +56,53 @@ public class CompressedInputStreamTest
     {
         testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
     }
+
+    /**
+     * Test CompressedInputStream not hang when closed while reading
+     * @throws Exception
+     */
+    @Test(expected = EOFException.class)
+    public void testClose() throws Exception
+    {
+        CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+        CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
+        final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
+        InputStream blockingInput = new InputStream()
+        {
+            @Override
+            public int read() throws IOException
+            {
+                try
+                {
+                    // 10 second cut off not to stop other test in case
+                    return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
+                }
+                catch (InterruptedException e)
+                {
+                    throw new IOException("Interrupted as expected", e);
+                }
+            }
+        };
+        CompressionInfo info = new CompressionInfo(chunks, param);
+        try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, true))
+        {
+            new Thread(new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    try
+                    {
+                        cis.close();
+                    }
+                    catch (Exception ignore) {}
+                }
+            }).start();
+            // block here
+            cis.read();
+        }
+    }
+
     /**
      * @param valuesToCheck array of longs of range(0-999)
      * @throws Exception
@@ -70,18 +115,20 @@ public class CompressedInputStreamTest
         File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
         Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
         MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
-        CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
-        CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
-        Map<Long, Long> index = new HashMap<Long, Long>();
-        for (long l = 0L; l < 1000; l++)
+        CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+        Map<Long, Long> index = new HashMap<>();
+        try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
         {
-            index.put(l, writer.getFilePointer());
-            writer.stream.writeLong(l);
+            for (long l = 0L; l < 1000; l++)
+            {
+                index.put(l, writer.getFilePointer());
+                writer.stream.writeLong(l);
+            }
+            writer.close();
         }
-        writer.close();
 
         CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
-        List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
+        List<Pair<Long, Long>> sections = new ArrayList<>();
         for (long l : valuesToCheck)
         {
             long position = index.get(l);
@@ -100,14 +147,15 @@ public class CompressedInputStreamTest
             size += (c.length + 4); // 4bytes CRC
         byte[] toRead = new byte[size];
 
-        RandomAccessFile f = new RandomAccessFile(tmp, "r");
-        int pos = 0;
-        for (CompressionMetadata.Chunk c : chunks)
+        try (RandomAccessFile f = new RandomAccessFile(tmp, "r"))
         {
-            f.seek(c.offset);
-            pos += f.read(toRead, pos, c.length + 4);
+            int pos = 0;
+            for (CompressionMetadata.Chunk c : chunks)
+            {
+                f.seek(c.offset);
+                pos += f.read(toRead, pos, c.length + 4);
+            }
         }
-        f.close();
 
         if (testTruncate)
         {
@@ -119,13 +167,15 @@ public class CompressedInputStreamTest
         // read buffer using CompressedInputStream
         CompressionInfo info = new CompressionInfo(chunks, param);
         CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true);
-        DataInputStream in = new DataInputStream(input);
 
-        for (int i = 0; i < sections.size(); i++)
+        try (DataInputStream in = new DataInputStream(input))
         {
-            input.position(sections.get(i).left);
-            long exp = in.readLong();
-            assert exp == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + exp;
+            for (int i = 0; i < sections.size(); i++)
+            {
+                input.position(sections.get(i).left);
+                long readValue = in.readLong();
+                assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
+            }
         }
     }
 }


[12/15] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0b3cfae4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0b3cfae4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0b3cfae4

Branch: refs/heads/cassandra-3.0
Commit: 0b3cfae4e619d1ece5ff8afc774eeb52b93166d8
Parents: 9fe790d 056055f
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 23 13:58:58 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 13:58:58 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compress/CompressedInputStream.java         | 46 ++++++++--
 .../compress/CompressedStreamReader.java        |  4 +
 .../compression/CompressedInputStreamTest.java  | 88 ++++++++++++++++----
 4 files changed, 114 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 4182cc1,d11be26..608d8f8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -11,7 -3,16 +11,8 @@@ Merged from 2.2
   * Fix SimpleDateType type compatibility (CASSANDRA-10027)
   * (Hadoop) fix splits calculation (CASSANDRA-10640)
   * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 - * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
 - * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 - * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 - * Expose phi values from failure detector via JMX and tweak debug
 -   and trace logging (CASSANDRA-9526)
 - * Fix RangeNamesQueryPager (CASSANDRA-10509)
 - * Deprecate Pig support (CASSANDRA-10542)
 - * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
  Merged from 2.1:
+  * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
   * (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
   * Try next replica if not possible to connect to primary replica on
     ColumnFamilyRecordReader (CASSANDRA-2388)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index ccd0ac5,daa339a..56dc63a
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@@ -42,10 -41,9 +42,10 @@@ public class CompressedInputStream exte
      private final CompressionInfo info;
      // chunk buffer
      private final BlockingQueue<byte[]> dataBuffer;
 +    private final Supplier<Double> crcCheckChanceSupplier;
  
      // uncompressed bytes
-     private byte[] buffer;
+     private final byte[] buffer;
  
      // offset from the beginning of the buffer
      protected long bufferOffset = 0;
@@@ -67,16 -67,16 +69,17 @@@
       * @param source Input source to read compressed data from
       * @param info Compression info
       */
 -    public CompressedInputStream(InputStream source, CompressionInfo info)
 +    public CompressedInputStream(InputStream source, CompressionInfo info, ChecksumType checksumType, Supplier<Double> crcCheckChanceSupplier)
      {
          this.info = info;
 -        this.checksum =  new Adler32();
 +        this.checksum =  checksumType.newInstance();
          this.buffer = new byte[info.parameters.chunkLength()];
          // buffer is limited to store up to 1024 chunks
-         this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+         this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
 +        this.crcCheckChanceSupplier = crcCheckChanceSupplier;
  
-         new Thread(new Reader(source, info, dataBuffer)).start();
+         readerThread = new Thread(new Reader(source, info, dataBuffer));
+         readerThread.start();
      }
  
      public int read() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
index 5646592,0000000..2162e32
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
@@@ -1,129 -1,0 +1,183 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.streaming.compression;
 +
 +import java.io.*;
 +import java.util.*;
++import java.util.concurrent.SynchronousQueue;
++import java.util.concurrent.TimeUnit;
 +
 +import org.junit.Test;
 +import org.apache.cassandra.db.ClusteringComparator;
 +import org.apache.cassandra.db.marshal.BytesType;
 +import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 +import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.schema.CompressionParams;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.streaming.compress.CompressedInputStream;
 +import org.apache.cassandra.streaming.compress.CompressionInfo;
 +import org.apache.cassandra.utils.ChecksumType;
 +import org.apache.cassandra.utils.Pair;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +/**
 + */
 +public class CompressedInputStreamTest
 +{
 +    @Test
 +    public void testCompressedRead() throws Exception
 +    {
 +        testCompressedReadWith(new long[]{0L}, false);
 +        testCompressedReadWith(new long[]{1L}, false);
 +        testCompressedReadWith(new long[]{100L}, false);
 +
 +        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false);
 +    }
 +
 +    @Test(expected = EOFException.class)
 +    public void testTruncatedRead() throws Exception
 +    {
 +        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
 +    }
++
++    /**
++     * Test CompressedInputStream not hang when closed while reading
++     * @throws IOException
++     */
++    @Test(expected = EOFException.class)
++    public void testClose() throws IOException
++    {
++        CompressionParams param = CompressionParams.snappy(32);
++        CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
++        final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
++        InputStream blockingInput = new InputStream()
++        {
++            @Override
++            public int read() throws IOException
++            {
++                try
++                {
++                    // 10 second cut off not to stop other test in case
++                    return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
++                }
++                catch (InterruptedException e)
++                {
++                    throw new IOException("Interrupted as expected", e);
++                }
++            }
++        };
++        CompressionInfo info = new CompressionInfo(chunks, param);
++        try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, ChecksumType.CRC32, () -> 1.0))
++        {
++            new Thread(new Runnable()
++            {
++                @Override
++                public void run()
++                {
++                    try
++                    {
++                        cis.close();
++                    }
++                    catch (Exception ignore) {}
++                }
++            }).start();
++            // block here
++            cis.read();
++        }
++    }
++
 +    /**
 +     * @param valuesToCheck array of longs of range(0-999)
 +     * @throws Exception
 +     */
 +    private void testCompressedReadWith(long[] valuesToCheck, boolean testTruncate) throws Exception
 +    {
 +        assert valuesToCheck != null && valuesToCheck.length > 0;
 +
 +        // write compressed data file of longs
 +        File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
 +        Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
 +        MetadataCollector collector = new MetadataCollector(new ClusteringComparator(BytesType.instance));
 +        CompressionParams param = CompressionParams.snappy(32);
-         CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
 +        Map<Long, Long> index = new HashMap<Long, Long>();
-         for (long l = 0L; l < 1000; l++)
++        try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
 +        {
-             index.put(l, writer.position());
-             writer.writeLong(l);
++            for (long l = 0L; l < 1000; l++)
++            {
++                index.put(l, writer.position());
++                writer.writeLong(l);
++            }
++            writer.finish();
 +        }
-         writer.finish();
 +
 +        CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
-         List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
++        List<Pair<Long, Long>> sections = new ArrayList<>();
 +        for (long l : valuesToCheck)
 +        {
 +            long position = index.get(l);
 +            sections.add(Pair.create(position, position + 8));
 +        }
 +        CompressionMetadata.Chunk[] chunks = comp.getChunksForSections(sections);
 +        long totalSize = comp.getTotalSizeForSections(sections);
 +        long expectedSize = 0;
 +        for (CompressionMetadata.Chunk c : chunks)
 +            expectedSize += c.length + 4;
 +        assertEquals(expectedSize, totalSize);
 +
 +        // buffer up only relevant parts of file
 +        int size = 0;
 +        for (CompressionMetadata.Chunk c : chunks)
 +            size += (c.length + 4); // 4bytes CRC
 +        byte[] toRead = new byte[size];
 +
-         RandomAccessFile f = new RandomAccessFile(tmp, "r");
-         int pos = 0;
-         for (CompressionMetadata.Chunk c : chunks)
++        try (RandomAccessFile f = new RandomAccessFile(tmp, "r"))
 +        {
-             f.seek(c.offset);
-             pos += f.read(toRead, pos, c.length + 4);
++            int pos = 0;
++            for (CompressionMetadata.Chunk c : chunks)
++            {
++                f.seek(c.offset);
++                pos += f.read(toRead, pos, c.length + 4);
++            }
 +        }
-         f.close();
 +
 +        if (testTruncate)
 +        {
 +            byte [] actuallyRead = new byte[50];
 +            System.arraycopy(toRead, 0, actuallyRead, 0, 50);
 +            toRead = actuallyRead;
 +        }
 +
 +        // read buffer using CompressedInputStream
 +        CompressionInfo info = new CompressionInfo(chunks, param);
 +        CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info,
 +                                                                ChecksumType.CRC32, () -> 1.0);
-         DataInputStream in = new DataInputStream(input);
 +
-         for (int i = 0; i < sections.size(); i++)
++        try (DataInputStream in = new DataInputStream(input))
 +        {
-             input.position(sections.get(i).left);
-             long readValue = in.readLong();
-             assert readValue == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + readValue;
++            for (int i = 0; i < sections.size(); i++)
++            {
++                input.position(sections.get(i).left);
++                long readValue = in.readLong();
++                assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
++            }
 +        }
 +    }
 +}


[02/15] cassandra git commit: Fix CompressedInputStream for proper cleanup

Posted by yu...@apache.org.
Fix CompressedInputStream for proper cleanup

patch by Chris Moos and yukim; reviewed by Paulo Motta for CASSANDRA-10012


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b9a9161
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b9a9161
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b9a9161

Branch: refs/heads/cassandra-2.2
Commit: 8b9a9161caf678bfe2ead7aa970fc6b607372c42
Parents: 4a94f75
Author: Chris Moos <ch...@chrismoos.com>
Authored: Mon Nov 23 12:31:24 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 12:33:09 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compress/CompressedInputStream.java         | 46 +++++++--
 .../compress/CompressedStreamReader.java        |  4 +
 .../compress/CompressedInputStreamTest.java     | 98 +++++++++++++++-----
 4 files changed, 117 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 86e5cb2..c4dd54e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
  * (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
  * Try next replica if not possible to connect to primary replica on
    ColumnFamilyRecordReader (CASSANDRA-2388)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 449546f..b4a3065 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -45,7 +45,7 @@ public class CompressedInputStream extends InputStream
     private final BlockingQueue<byte[]> dataBuffer;
 
     // uncompressed bytes
-    private byte[] buffer;
+    private final byte[] buffer;
 
     // offset from the beginning of the buffer
     protected long bufferOffset = 0;
@@ -64,6 +64,8 @@ public class CompressedInputStream extends InputStream
     private long totalCompressedBytesRead;
     private final boolean hasPostCompressionAdlerChecksums;
 
+    private Thread readerThread;
+
     /**
      * @param source Input source to read compressed data from
      * @param info Compression info
@@ -75,9 +77,10 @@ public class CompressedInputStream extends InputStream
         this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
         this.buffer = new byte[info.parameters.chunkLength()];
         // buffer is limited to store up to 1024 chunks
-        this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+        this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
 
-        new Thread(new Reader(source, info, dataBuffer)).start();
+        readerThread = new Thread(new Reader(source, info, dataBuffer));
+        readerThread.start();
     }
 
     public int read() throws IOException
@@ -143,7 +146,7 @@ public class CompressedInputStream extends InputStream
         return totalCompressedBytesRead;
     }
 
-    static class Reader extends WrappedRunnable
+    class Reader extends WrappedRunnable
     {
         private final InputStream source;
         private final Iterator<CompressionMetadata.Chunk> chunks;
@@ -159,7 +162,7 @@ public class CompressedInputStream extends InputStream
         protected void runMayThrow() throws Exception
         {
             byte[] compressedWithCRC;
-            while (chunks.hasNext())
+            while (!Thread.currentThread().isInterrupted() && chunks.hasNext())
             {
                 CompressionMetadata.Chunk chunk = chunks.next();
 
@@ -169,16 +172,43 @@ public class CompressedInputStream extends InputStream
                 int bufferRead = 0;
                 while (bufferRead < readLength)
                 {
-                    int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
-                    if (r < 0)
+                    int r;
+                    try
+                    {
+                        r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
+                        if (r < 0)
+                        {
+                            dataBuffer.put(POISON_PILL);
+                            return; // throw exception where we consume dataBuffer
+                        }
+                    }
+                    catch (IOException e)
                     {
                         dataBuffer.put(POISON_PILL);
-                        return; // throw exception where we consume dataBuffer
+                        throw e;
                     }
                     bufferRead += r;
                 }
                 dataBuffer.put(compressedWithCRC);
             }
+            synchronized(CompressedInputStream.this)
+            {
+                readerThread = null;
+            }
         }
     }
+
+    @Override
+    public void close() throws IOException
+    {
+        synchronized(this)
+        {
+            if (readerThread != null)
+            {
+                readerThread.interrupt();
+                readerThread = null;
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 0529496..4f60773 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -113,6 +113,10 @@ public class CompressedStreamReader extends StreamReader
             else
                 throw Throwables.propagate(e);
         }
+        finally
+        {
+            cis.close();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index f3007da..87e0003 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -17,12 +17,10 @@
  */
 package org.apache.cassandra.streaming.compress;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.RandomAccessFile;
+import java.io.*;
 import java.util.*;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Test;
 
@@ -58,6 +56,53 @@ public class CompressedInputStreamTest
     {
         testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
     }
+
+    /**
+     * Test CompressedInputStream not hang when closed while reading
+     * @throws Exception
+     */
+    @Test(expected = EOFException.class)
+    public void testClose() throws Exception
+    {
+        CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+        CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
+        final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
+        InputStream blockingInput = new InputStream()
+        {
+            @Override
+            public int read() throws IOException
+            {
+                try
+                {
+                    // 10 second cut off not to stop other test in case
+                    return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
+                }
+                catch (InterruptedException e)
+                {
+                    throw new IOException("Interrupted as expected", e);
+                }
+            }
+        };
+        CompressionInfo info = new CompressionInfo(chunks, param);
+        try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, true))
+        {
+            new Thread(new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    try
+                    {
+                        cis.close();
+                    }
+                    catch (Exception ignore) {}
+                }
+            }).start();
+            // block here
+            cis.read();
+        }
+    }
+
     /**
      * @param valuesToCheck array of longs of range(0-999)
      * @throws Exception
@@ -70,18 +115,20 @@ public class CompressedInputStreamTest
         File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
         Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
         MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
-        CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
-        CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
-        Map<Long, Long> index = new HashMap<Long, Long>();
-        for (long l = 0L; l < 1000; l++)
+        CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+        Map<Long, Long> index = new HashMap<>();
+        try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
         {
-            index.put(l, writer.getFilePointer());
-            writer.stream.writeLong(l);
+            for (long l = 0L; l < 1000; l++)
+            {
+                index.put(l, writer.getFilePointer());
+                writer.stream.writeLong(l);
+            }
+            writer.close();
         }
-        writer.close();
 
         CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
-        List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
+        List<Pair<Long, Long>> sections = new ArrayList<>();
         for (long l : valuesToCheck)
         {
             long position = index.get(l);
@@ -100,14 +147,15 @@ public class CompressedInputStreamTest
             size += (c.length + 4); // 4bytes CRC
         byte[] toRead = new byte[size];
 
-        RandomAccessFile f = new RandomAccessFile(tmp, "r");
-        int pos = 0;
-        for (CompressionMetadata.Chunk c : chunks)
+        try (RandomAccessFile f = new RandomAccessFile(tmp, "r"))
         {
-            f.seek(c.offset);
-            pos += f.read(toRead, pos, c.length + 4);
+            int pos = 0;
+            for (CompressionMetadata.Chunk c : chunks)
+            {
+                f.seek(c.offset);
+                pos += f.read(toRead, pos, c.length + 4);
+            }
         }
-        f.close();
 
         if (testTruncate)
         {
@@ -119,13 +167,15 @@ public class CompressedInputStreamTest
         // read buffer using CompressedInputStream
         CompressionInfo info = new CompressionInfo(chunks, param);
         CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true);
-        DataInputStream in = new DataInputStream(input);
 
-        for (int i = 0; i < sections.size(); i++)
+        try (DataInputStream in = new DataInputStream(input))
         {
-            input.position(sections.get(i).left);
-            long exp = in.readLong();
-            assert exp == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + exp;
+            for (int i = 0; i < sections.size(); i++)
+            {
+                input.position(sections.get(i).left);
+                long readValue = in.readLong();
+                assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
+            }
         }
     }
 }


[03/15] cassandra git commit: Fix CompressedInputStream for proper cleanup

Posted by yu...@apache.org.
Fix CompressedInputStream for proper cleanup

patch by Chris Moos and yukim; reviewed by Paulo Motta for CASSANDRA-10012


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b9a9161
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b9a9161
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b9a9161

Branch: refs/heads/trunk
Commit: 8b9a9161caf678bfe2ead7aa970fc6b607372c42
Parents: 4a94f75
Author: Chris Moos <ch...@chrismoos.com>
Authored: Mon Nov 23 12:31:24 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 12:33:09 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compress/CompressedInputStream.java         | 46 +++++++--
 .../compress/CompressedStreamReader.java        |  4 +
 .../compress/CompressedInputStreamTest.java     | 98 +++++++++++++++-----
 4 files changed, 117 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 86e5cb2..c4dd54e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
  * (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
  * Try next replica if not possible to connect to primary replica on
    ColumnFamilyRecordReader (CASSANDRA-2388)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 449546f..b4a3065 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -45,7 +45,7 @@ public class CompressedInputStream extends InputStream
     private final BlockingQueue<byte[]> dataBuffer;
 
     // uncompressed bytes
-    private byte[] buffer;
+    private final byte[] buffer;
 
     // offset from the beginning of the buffer
     protected long bufferOffset = 0;
@@ -64,6 +64,8 @@ public class CompressedInputStream extends InputStream
     private long totalCompressedBytesRead;
     private final boolean hasPostCompressionAdlerChecksums;
 
+    private Thread readerThread;
+
     /**
      * @param source Input source to read compressed data from
      * @param info Compression info
@@ -75,9 +77,10 @@ public class CompressedInputStream extends InputStream
         this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
         this.buffer = new byte[info.parameters.chunkLength()];
         // buffer is limited to store up to 1024 chunks
-        this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+        this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
 
-        new Thread(new Reader(source, info, dataBuffer)).start();
+        readerThread = new Thread(new Reader(source, info, dataBuffer));
+        readerThread.start();
     }
 
     public int read() throws IOException
@@ -143,7 +146,7 @@ public class CompressedInputStream extends InputStream
         return totalCompressedBytesRead;
     }
 
-    static class Reader extends WrappedRunnable
+    class Reader extends WrappedRunnable
     {
         private final InputStream source;
         private final Iterator<CompressionMetadata.Chunk> chunks;
@@ -159,7 +162,7 @@ public class CompressedInputStream extends InputStream
         protected void runMayThrow() throws Exception
         {
             byte[] compressedWithCRC;
-            while (chunks.hasNext())
+            while (!Thread.currentThread().isInterrupted() && chunks.hasNext())
             {
                 CompressionMetadata.Chunk chunk = chunks.next();
 
@@ -169,16 +172,43 @@ public class CompressedInputStream extends InputStream
                 int bufferRead = 0;
                 while (bufferRead < readLength)
                 {
-                    int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
-                    if (r < 0)
+                    int r;
+                    try
+                    {
+                        r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
+                        if (r < 0)
+                        {
+                            dataBuffer.put(POISON_PILL);
+                            return; // throw exception where we consume dataBuffer
+                        }
+                    }
+                    catch (IOException e)
                     {
                         dataBuffer.put(POISON_PILL);
-                        return; // throw exception where we consume dataBuffer
+                        throw e;
                     }
                     bufferRead += r;
                 }
                 dataBuffer.put(compressedWithCRC);
             }
+            synchronized(CompressedInputStream.this)
+            {
+                readerThread = null;
+            }
         }
     }
+
+    @Override
+    public void close() throws IOException
+    {
+        synchronized(this)
+        {
+            if (readerThread != null)
+            {
+                readerThread.interrupt();
+                readerThread = null;
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 0529496..4f60773 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -113,6 +113,10 @@ public class CompressedStreamReader extends StreamReader
             else
                 throw Throwables.propagate(e);
         }
+        finally
+        {
+            cis.close();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index f3007da..87e0003 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -17,12 +17,10 @@
  */
 package org.apache.cassandra.streaming.compress;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.RandomAccessFile;
+import java.io.*;
 import java.util.*;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Test;
 
@@ -58,6 +56,53 @@ public class CompressedInputStreamTest
     {
         testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
     }
+
+    /**
+     * Test CompressedInputStream not hang when closed while reading
+     * @throws Exception
+     */
+    @Test(expected = EOFException.class)
+    public void testClose() throws Exception
+    {
+        CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+        CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
+        final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
+        InputStream blockingInput = new InputStream()
+        {
+            @Override
+            public int read() throws IOException
+            {
+                try
+                {
+                    // 10 second cut off not to stop other test in case
+                    return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
+                }
+                catch (InterruptedException e)
+                {
+                    throw new IOException("Interrupted as expected", e);
+                }
+            }
+        };
+        CompressionInfo info = new CompressionInfo(chunks, param);
+        try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, true))
+        {
+            new Thread(new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    try
+                    {
+                        cis.close();
+                    }
+                    catch (Exception ignore) {}
+                }
+            }).start();
+            // block here
+            cis.read();
+        }
+    }
+
     /**
      * @param valuesToCheck array of longs of range(0-999)
      * @throws Exception
@@ -70,18 +115,20 @@ public class CompressedInputStreamTest
         File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
         Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
         MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
-        CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
-        CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
-        Map<Long, Long> index = new HashMap<Long, Long>();
-        for (long l = 0L; l < 1000; l++)
+        CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+        Map<Long, Long> index = new HashMap<>();
+        try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
         {
-            index.put(l, writer.getFilePointer());
-            writer.stream.writeLong(l);
+            for (long l = 0L; l < 1000; l++)
+            {
+                index.put(l, writer.getFilePointer());
+                writer.stream.writeLong(l);
+            }
+            writer.close();
         }
-        writer.close();
 
         CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
-        List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
+        List<Pair<Long, Long>> sections = new ArrayList<>();
         for (long l : valuesToCheck)
         {
             long position = index.get(l);
@@ -100,14 +147,15 @@ public class CompressedInputStreamTest
             size += (c.length + 4); // 4bytes CRC
         byte[] toRead = new byte[size];
 
-        RandomAccessFile f = new RandomAccessFile(tmp, "r");
-        int pos = 0;
-        for (CompressionMetadata.Chunk c : chunks)
+        try (RandomAccessFile f = new RandomAccessFile(tmp, "r"))
         {
-            f.seek(c.offset);
-            pos += f.read(toRead, pos, c.length + 4);
+            int pos = 0;
+            for (CompressionMetadata.Chunk c : chunks)
+            {
+                f.seek(c.offset);
+                pos += f.read(toRead, pos, c.length + 4);
+            }
         }
-        f.close();
 
         if (testTruncate)
         {
@@ -119,13 +167,15 @@ public class CompressedInputStreamTest
         // read buffer using CompressedInputStream
         CompressionInfo info = new CompressionInfo(chunks, param);
         CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true);
-        DataInputStream in = new DataInputStream(input);
 
-        for (int i = 0; i < sections.size(); i++)
+        try (DataInputStream in = new DataInputStream(input))
         {
-            input.position(sections.get(i).left);
-            long exp = in.readLong();
-            assert exp == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + exp;
+            for (int i = 0; i < sections.size(); i++)
+            {
+                input.position(sections.get(i).left);
+                long readValue = in.readLong();
+                assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
+            }
         }
     }
 }


[08/15] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/056055fe
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/056055fe
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/056055fe

Branch: refs/heads/cassandra-3.0
Commit: 056055febd55e1c19a6216627b8568e60141b9fa
Parents: 2aa8342 8b9a916
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 23 13:17:39 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 13:17:39 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compress/CompressedInputStream.java         | 46 +++++++--
 .../compress/CompressedStreamReader.java        |  4 +
 .../compress/CompressedInputStreamTest.java     | 98 +++++++++++++++-----
 4 files changed, 117 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 146a0ce,c4dd54e..d11be26
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,17 -1,5 +1,18 @@@
 -2.1.12
 +2.2.4
 + * Don't do anticompaction after subrange repair (CASSANDRA-10422)
 + * Fix SimpleDateType type compatibility (CASSANDRA-10027)
 + * (Hadoop) fix splits calculation (CASSANDRA-10640)
 + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 + * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
 + * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 + * Expose phi values from failure detector via JMX and tweak debug
 +   and trace logging (CASSANDRA-9526)
 + * Fix RangeNamesQueryPager (CASSANDRA-10509)
 + * Deprecate Pig support (CASSANDRA-10542)
 + * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
 +Merged from 2.1:
+  * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
   * (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
   * Try next replica if not possible to connect to primary replica on
     ColumnFamilyRecordReader (CASSANDRA-2388)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 872afcd,b4a3065..daa339a
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@@ -60,20 -62,25 +60,23 @@@ public class CompressedInputStream exte
      private static final byte[] POISON_PILL = new byte[0];
  
      private long totalCompressedBytesRead;
 -    private final boolean hasPostCompressionAdlerChecksums;
  
+     private Thread readerThread;
+ 
      /**
       * @param source Input source to read compressed data from
       * @param info Compression info
       */
 -    public CompressedInputStream(InputStream source, CompressionInfo info, boolean hasPostCompressionAdlerChecksums)
 +    public CompressedInputStream(InputStream source, CompressionInfo info)
      {
          this.info = info;
 -        this.checksum = hasPostCompressionAdlerChecksums ? new Adler32() : new CRC32();
 -        this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
 +        this.checksum =  new Adler32();
          this.buffer = new byte[info.parameters.chunkLength()];
          // buffer is limited to store up to 1024 chunks
-         this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+         this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
  
-         new Thread(new Reader(source, info, dataBuffer)).start();
+         readerThread = new Thread(new Reader(source, info, dataBuffer));
+         readerThread.start();
      }
  
      public int read() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 0214c76,87e0003..e692441
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@@ -58,6 -56,53 +56,53 @@@ public class CompressedInputStreamTes
      {
          testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
      }
+ 
+     /**
+      * Test CompressedInputStream not hang when closed while reading
 -     * @throws Exception
++     * @throws IOException
+      */
+     @Test(expected = EOFException.class)
 -    public void testClose() throws Exception
++    public void testClose() throws IOException
+     {
+         CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+         CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
+         final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
+         InputStream blockingInput = new InputStream()
+         {
+             @Override
+             public int read() throws IOException
+             {
+                 try
+                 {
+                     // 10 second cut off not to stop other test in case
+                     return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
+                 }
+                 catch (InterruptedException e)
+                 {
+                     throw new IOException("Interrupted as expected", e);
+                 }
+             }
+         };
+         CompressionInfo info = new CompressionInfo(chunks, param);
 -        try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, true))
++        try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info))
+         {
+             new Thread(new Runnable()
+             {
+                 @Override
+                 public void run()
+                 {
+                     try
+                     {
+                         cis.close();
+                     }
+                     catch (Exception ignore) {}
+                 }
+             }).start();
+             // block here
+             cis.read();
+         }
+     }
+ 
      /**
       * @param valuesToCheck array of longs of range(0-999)
       * @throws Exception
@@@ -70,18 -115,20 +115,20 @@@
          File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
          Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
          MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
-         CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
-         CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
-         Map<Long, Long> index = new HashMap<Long, Long>();
-         for (long l = 0L; l < 1000; l++)
+         CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+         Map<Long, Long> index = new HashMap<>();
+         try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
          {
-             index.put(l, writer.getFilePointer());
-             writer.stream.writeLong(l);
+             for (long l = 0L; l < 1000; l++)
+             {
+                 index.put(l, writer.getFilePointer());
+                 writer.stream.writeLong(l);
+             }
 -            writer.close();
++            writer.finish();
          }
-         writer.finish();
  
          CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
-         List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
+         List<Pair<Long, Long>> sections = new ArrayList<>();
          for (long l : valuesToCheck)
          {
              long position = index.get(l);
@@@ -118,14 -166,16 +166,16 @@@
  
          // read buffer using CompressedInputStream
          CompressionInfo info = new CompressionInfo(chunks, param);
 -        CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true);
 +        CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info);
-         DataInputStream in = new DataInputStream(input);
  
-         for (int i = 0; i < sections.size(); i++)
+         try (DataInputStream in = new DataInputStream(input))
          {
-             input.position(sections.get(i).left);
-             long readValue = in.readLong();
-             assert readValue == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + readValue;
+             for (int i = 0; i < sections.size(); i++)
+             {
+                 input.position(sections.get(i).left);
+                 long readValue = in.readLong();
+                 assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
+             }
          }
      }
  }


[10/15] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0b3cfae4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0b3cfae4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0b3cfae4

Branch: refs/heads/trunk
Commit: 0b3cfae4e619d1ece5ff8afc774eeb52b93166d8
Parents: 9fe790d 056055f
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 23 13:58:58 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 13:58:58 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compress/CompressedInputStream.java         | 46 ++++++++--
 .../compress/CompressedStreamReader.java        |  4 +
 .../compression/CompressedInputStreamTest.java  | 88 ++++++++++++++++----
 4 files changed, 114 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 4182cc1,d11be26..608d8f8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -11,7 -3,16 +11,8 @@@ Merged from 2.2
   * Fix SimpleDateType type compatibility (CASSANDRA-10027)
   * (Hadoop) fix splits calculation (CASSANDRA-10640)
   * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 - * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
 - * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 - * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 - * Expose phi values from failure detector via JMX and tweak debug
 -   and trace logging (CASSANDRA-9526)
 - * Fix RangeNamesQueryPager (CASSANDRA-10509)
 - * Deprecate Pig support (CASSANDRA-10542)
 - * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
  Merged from 2.1:
+  * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
   * (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
   * Try next replica if not possible to connect to primary replica on
     ColumnFamilyRecordReader (CASSANDRA-2388)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index ccd0ac5,daa339a..56dc63a
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@@ -42,10 -41,9 +42,10 @@@ public class CompressedInputStream exte
      private final CompressionInfo info;
      // chunk buffer
      private final BlockingQueue<byte[]> dataBuffer;
 +    private final Supplier<Double> crcCheckChanceSupplier;
  
      // uncompressed bytes
-     private byte[] buffer;
+     private final byte[] buffer;
  
      // offset from the beginning of the buffer
      protected long bufferOffset = 0;
@@@ -67,16 -67,16 +69,17 @@@
       * @param source Input source to read compressed data from
       * @param info Compression info
       */
 -    public CompressedInputStream(InputStream source, CompressionInfo info)
 +    public CompressedInputStream(InputStream source, CompressionInfo info, ChecksumType checksumType, Supplier<Double> crcCheckChanceSupplier)
      {
          this.info = info;
 -        this.checksum =  new Adler32();
 +        this.checksum =  checksumType.newInstance();
          this.buffer = new byte[info.parameters.chunkLength()];
          // buffer is limited to store up to 1024 chunks
-         this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+         this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
 +        this.crcCheckChanceSupplier = crcCheckChanceSupplier;
  
-         new Thread(new Reader(source, info, dataBuffer)).start();
+         readerThread = new Thread(new Reader(source, info, dataBuffer));
+         readerThread.start();
      }
  
      public int read() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
index 5646592,0000000..2162e32
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
@@@ -1,129 -1,0 +1,183 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.streaming.compression;
 +
 +import java.io.*;
 +import java.util.*;
++import java.util.concurrent.SynchronousQueue;
++import java.util.concurrent.TimeUnit;
 +
 +import org.junit.Test;
 +import org.apache.cassandra.db.ClusteringComparator;
 +import org.apache.cassandra.db.marshal.BytesType;
 +import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 +import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.schema.CompressionParams;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.streaming.compress.CompressedInputStream;
 +import org.apache.cassandra.streaming.compress.CompressionInfo;
 +import org.apache.cassandra.utils.ChecksumType;
 +import org.apache.cassandra.utils.Pair;
 +
 +import static org.junit.Assert.assertEquals;
 +
 +/**
 + */
 +public class CompressedInputStreamTest
 +{
 +    @Test
 +    public void testCompressedRead() throws Exception
 +    {
 +        testCompressedReadWith(new long[]{0L}, false);
 +        testCompressedReadWith(new long[]{1L}, false);
 +        testCompressedReadWith(new long[]{100L}, false);
 +
 +        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false);
 +    }
 +
 +    @Test(expected = EOFException.class)
 +    public void testTruncatedRead() throws Exception
 +    {
 +        testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
 +    }
++
++    /**
++     * Test CompressedInputStream not hang when closed while reading
++     * @throws IOException
++     */
++    @Test(expected = EOFException.class)
++    public void testClose() throws IOException
++    {
++        CompressionParams param = CompressionParams.snappy(32);
++        CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
++        final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
++        InputStream blockingInput = new InputStream()
++        {
++            @Override
++            public int read() throws IOException
++            {
++                try
++                {
++                    // 10 second cut off not to stop other test in case
++                    return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
++                }
++                catch (InterruptedException e)
++                {
++                    throw new IOException("Interrupted as expected", e);
++                }
++            }
++        };
++        CompressionInfo info = new CompressionInfo(chunks, param);
++        try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, ChecksumType.CRC32, () -> 1.0))
++        {
++            new Thread(new Runnable()
++            {
++                @Override
++                public void run()
++                {
++                    try
++                    {
++                        cis.close();
++                    }
++                    catch (Exception ignore) {}
++                }
++            }).start();
++            // block here
++            cis.read();
++        }
++    }
++
 +    /**
 +     * @param valuesToCheck array of longs of range(0-999)
 +     * @throws Exception
 +     */
 +    private void testCompressedReadWith(long[] valuesToCheck, boolean testTruncate) throws Exception
 +    {
 +        assert valuesToCheck != null && valuesToCheck.length > 0;
 +
 +        // write compressed data file of longs
 +        File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
 +        Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
 +        MetadataCollector collector = new MetadataCollector(new ClusteringComparator(BytesType.instance));
 +        CompressionParams param = CompressionParams.snappy(32);
-         CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
 +        Map<Long, Long> index = new HashMap<Long, Long>();
-         for (long l = 0L; l < 1000; l++)
++        try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
 +        {
-             index.put(l, writer.position());
-             writer.writeLong(l);
++            for (long l = 0L; l < 1000; l++)
++            {
++                index.put(l, writer.position());
++                writer.writeLong(l);
++            }
++            writer.finish();
 +        }
-         writer.finish();
 +
 +        CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
-         List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
++        List<Pair<Long, Long>> sections = new ArrayList<>();
 +        for (long l : valuesToCheck)
 +        {
 +            long position = index.get(l);
 +            sections.add(Pair.create(position, position + 8));
 +        }
 +        CompressionMetadata.Chunk[] chunks = comp.getChunksForSections(sections);
 +        long totalSize = comp.getTotalSizeForSections(sections);
 +        long expectedSize = 0;
 +        for (CompressionMetadata.Chunk c : chunks)
 +            expectedSize += c.length + 4;
 +        assertEquals(expectedSize, totalSize);
 +
 +        // buffer up only relevant parts of file
 +        int size = 0;
 +        for (CompressionMetadata.Chunk c : chunks)
 +            size += (c.length + 4); // 4bytes CRC
 +        byte[] toRead = new byte[size];
 +
-         RandomAccessFile f = new RandomAccessFile(tmp, "r");
-         int pos = 0;
-         for (CompressionMetadata.Chunk c : chunks)
++        try (RandomAccessFile f = new RandomAccessFile(tmp, "r"))
 +        {
-             f.seek(c.offset);
-             pos += f.read(toRead, pos, c.length + 4);
++            int pos = 0;
++            for (CompressionMetadata.Chunk c : chunks)
++            {
++                f.seek(c.offset);
++                pos += f.read(toRead, pos, c.length + 4);
++            }
 +        }
-         f.close();
 +
 +        if (testTruncate)
 +        {
 +            byte [] actuallyRead = new byte[50];
 +            System.arraycopy(toRead, 0, actuallyRead, 0, 50);
 +            toRead = actuallyRead;
 +        }
 +
 +        // read buffer using CompressedInputStream
 +        CompressionInfo info = new CompressionInfo(chunks, param);
 +        CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info,
 +                                                                ChecksumType.CRC32, () -> 1.0);
-         DataInputStream in = new DataInputStream(input);
 +
-         for (int i = 0; i < sections.size(); i++)
++        try (DataInputStream in = new DataInputStream(input))
 +        {
-             input.position(sections.get(i).left);
-             long readValue = in.readLong();
-             assert readValue == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + readValue;
++            for (int i = 0; i < sections.size(); i++)
++            {
++                input.position(sections.get(i).left);
++                long readValue = in.readLong();
++                assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
++            }
 +        }
 +    }
 +}


[07/15] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/056055fe
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/056055fe
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/056055fe

Branch: refs/heads/cassandra-2.2
Commit: 056055febd55e1c19a6216627b8568e60141b9fa
Parents: 2aa8342 8b9a916
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 23 13:17:39 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 13:17:39 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compress/CompressedInputStream.java         | 46 +++++++--
 .../compress/CompressedStreamReader.java        |  4 +
 .../compress/CompressedInputStreamTest.java     | 98 +++++++++++++++-----
 4 files changed, 117 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 146a0ce,c4dd54e..d11be26
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,17 -1,5 +1,18 @@@
 -2.1.12
 +2.2.4
 + * Don't do anticompaction after subrange repair (CASSANDRA-10422)
 + * Fix SimpleDateType type compatibility (CASSANDRA-10027)
 + * (Hadoop) fix splits calculation (CASSANDRA-10640)
 + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 + * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
 + * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 + * Expose phi values from failure detector via JMX and tweak debug
 +   and trace logging (CASSANDRA-9526)
 + * Fix RangeNamesQueryPager (CASSANDRA-10509)
 + * Deprecate Pig support (CASSANDRA-10542)
 + * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
 +Merged from 2.1:
+  * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
   * (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
   * Try next replica if not possible to connect to primary replica on
     ColumnFamilyRecordReader (CASSANDRA-2388)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 872afcd,b4a3065..daa339a
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@@ -60,20 -62,25 +60,23 @@@ public class CompressedInputStream exte
      private static final byte[] POISON_PILL = new byte[0];
  
      private long totalCompressedBytesRead;
 -    private final boolean hasPostCompressionAdlerChecksums;
  
+     private Thread readerThread;
+ 
      /**
       * @param source Input source to read compressed data from
       * @param info Compression info
       */
 -    public CompressedInputStream(InputStream source, CompressionInfo info, boolean hasPostCompressionAdlerChecksums)
 +    public CompressedInputStream(InputStream source, CompressionInfo info)
      {
          this.info = info;
 -        this.checksum = hasPostCompressionAdlerChecksums ? new Adler32() : new CRC32();
 -        this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
 +        this.checksum =  new Adler32();
          this.buffer = new byte[info.parameters.chunkLength()];
          // buffer is limited to store up to 1024 chunks
-         this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+         this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
  
-         new Thread(new Reader(source, info, dataBuffer)).start();
+         readerThread = new Thread(new Reader(source, info, dataBuffer));
+         readerThread.start();
      }
  
      public int read() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 0214c76,87e0003..e692441
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@@ -58,6 -56,53 +56,53 @@@ public class CompressedInputStreamTes
      {
          testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
      }
+ 
+     /**
+      * Test CompressedInputStream not hang when closed while reading
 -     * @throws Exception
++     * @throws IOException
+      */
+     @Test(expected = EOFException.class)
 -    public void testClose() throws Exception
++    public void testClose() throws IOException
+     {
+         CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+         CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
+         final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
+         InputStream blockingInput = new InputStream()
+         {
+             @Override
+             public int read() throws IOException
+             {
+                 try
+                 {
+                     // 10 second cut off not to stop other test in case
+                     return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
+                 }
+                 catch (InterruptedException e)
+                 {
+                     throw new IOException("Interrupted as expected", e);
+                 }
+             }
+         };
+         CompressionInfo info = new CompressionInfo(chunks, param);
 -        try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, true))
++        try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info))
+         {
+             new Thread(new Runnable()
+             {
+                 @Override
+                 public void run()
+                 {
+                     try
+                     {
+                         cis.close();
+                     }
+                     catch (Exception ignore) {}
+                 }
+             }).start();
+             // block here
+             cis.read();
+         }
+     }
+ 
      /**
       * @param valuesToCheck array of longs of range(0-999)
       * @throws Exception
@@@ -70,18 -115,20 +115,20 @@@
          File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
          Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
          MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
-         CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
-         CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
-         Map<Long, Long> index = new HashMap<Long, Long>();
-         for (long l = 0L; l < 1000; l++)
+         CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+         Map<Long, Long> index = new HashMap<>();
+         try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
          {
-             index.put(l, writer.getFilePointer());
-             writer.stream.writeLong(l);
+             for (long l = 0L; l < 1000; l++)
+             {
+                 index.put(l, writer.getFilePointer());
+                 writer.stream.writeLong(l);
+             }
 -            writer.close();
++            writer.finish();
          }
-         writer.finish();
  
          CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
-         List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
+         List<Pair<Long, Long>> sections = new ArrayList<>();
          for (long l : valuesToCheck)
          {
              long position = index.get(l);
@@@ -118,14 -166,16 +166,16 @@@
  
          // read buffer using CompressedInputStream
          CompressionInfo info = new CompressionInfo(chunks, param);
 -        CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true);
 +        CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info);
-         DataInputStream in = new DataInputStream(input);
  
-         for (int i = 0; i < sections.size(); i++)
+         try (DataInputStream in = new DataInputStream(input))
          {
-             input.position(sections.get(i).left);
-             long readValue = in.readLong();
-             assert readValue == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + readValue;
+             for (int i = 0; i < sections.size(); i++)
+             {
+                 input.position(sections.get(i).left);
+                 long readValue = in.readLong();
+                 assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
+             }
          }
      }
  }