You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/11/23 21:00:42 UTC

[04/15] cassandra git commit: Fix CompressedInputStream for proper cleanup

Fix CompressedInputStream for proper cleanup

patch by Chris Moos and yukim; reviewed by Paulo Motta for CASSANDRA-10012


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b9a9161
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b9a9161
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b9a9161

Branch: refs/heads/cassandra-3.0
Commit: 8b9a9161caf678bfe2ead7aa970fc6b607372c42
Parents: 4a94f75
Author: Chris Moos <ch...@chrismoos.com>
Authored: Mon Nov 23 12:31:24 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 12:33:09 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compress/CompressedInputStream.java         | 46 +++++++--
 .../compress/CompressedStreamReader.java        |  4 +
 .../compress/CompressedInputStreamTest.java     | 98 +++++++++++++++-----
 4 files changed, 117 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 86e5cb2..c4dd54e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
  * (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
  * Try next replica if not possible to connect to primary replica on
    ColumnFamilyRecordReader (CASSANDRA-2388)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 449546f..b4a3065 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -45,7 +45,7 @@ public class CompressedInputStream extends InputStream
     private final BlockingQueue<byte[]> dataBuffer;
 
     // uncompressed bytes
-    private byte[] buffer;
+    private final byte[] buffer;
 
     // offset from the beginning of the buffer
     protected long bufferOffset = 0;
@@ -64,6 +64,8 @@ public class CompressedInputStream extends InputStream
     private long totalCompressedBytesRead;
     private final boolean hasPostCompressionAdlerChecksums;
 
+    private Thread readerThread;
+
     /**
      * @param source Input source to read compressed data from
      * @param info Compression info
@@ -75,9 +77,10 @@ public class CompressedInputStream extends InputStream
         this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
         this.buffer = new byte[info.parameters.chunkLength()];
         // buffer is limited to store up to 1024 chunks
-        this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+        this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
 
-        new Thread(new Reader(source, info, dataBuffer)).start();
+        readerThread = new Thread(new Reader(source, info, dataBuffer));
+        readerThread.start();
     }
 
     public int read() throws IOException
@@ -143,7 +146,7 @@ public class CompressedInputStream extends InputStream
         return totalCompressedBytesRead;
     }
 
-    static class Reader extends WrappedRunnable
+    class Reader extends WrappedRunnable
     {
         private final InputStream source;
         private final Iterator<CompressionMetadata.Chunk> chunks;
@@ -159,7 +162,7 @@ public class CompressedInputStream extends InputStream
         protected void runMayThrow() throws Exception
         {
             byte[] compressedWithCRC;
-            while (chunks.hasNext())
+            while (!Thread.currentThread().isInterrupted() && chunks.hasNext())
             {
                 CompressionMetadata.Chunk chunk = chunks.next();
 
@@ -169,16 +172,43 @@ public class CompressedInputStream extends InputStream
                 int bufferRead = 0;
                 while (bufferRead < readLength)
                 {
-                    int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
-                    if (r < 0)
+                    int r;
+                    try
+                    {
+                        r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
+                        if (r < 0)
+                        {
+                            dataBuffer.put(POISON_PILL);
+                            return; // throw exception where we consume dataBuffer
+                        }
+                    }
+                    catch (IOException e)
                     {
                         dataBuffer.put(POISON_PILL);
-                        return; // throw exception where we consume dataBuffer
+                        throw e;
                     }
                     bufferRead += r;
                 }
                 dataBuffer.put(compressedWithCRC);
             }
+            synchronized(CompressedInputStream.this)
+            {
+                readerThread = null;
+            }
         }
     }
+
+    @Override
+    public void close() throws IOException
+    {
+        synchronized(this)
+        {
+            if (readerThread != null)
+            {
+                readerThread.interrupt();
+                readerThread = null;
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 0529496..4f60773 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -113,6 +113,10 @@ public class CompressedStreamReader extends StreamReader
             else
                 throw Throwables.propagate(e);
         }
+        finally
+        {
+            cis.close();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index f3007da..87e0003 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -17,12 +17,10 @@
  */
 package org.apache.cassandra.streaming.compress;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.RandomAccessFile;
+import java.io.*;
 import java.util.*;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Test;
 
@@ -58,6 +56,53 @@ public class CompressedInputStreamTest
     {
         testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
     }
+
+    /**
+     * Test CompressedInputStream not hang when closed while reading
+     * @throws Exception
+     */
+    @Test(expected = EOFException.class)
+    public void testClose() throws Exception
+    {
+        CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+        CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
+        final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
+        InputStream blockingInput = new InputStream()
+        {
+            @Override
+            public int read() throws IOException
+            {
+                try
+                {
+                    // 10 second cut off not to stop other test in case
+                    return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
+                }
+                catch (InterruptedException e)
+                {
+                    throw new IOException("Interrupted as expected", e);
+                }
+            }
+        };
+        CompressionInfo info = new CompressionInfo(chunks, param);
+        try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, true))
+        {
+            new Thread(new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    try
+                    {
+                        cis.close();
+                    }
+                    catch (Exception ignore) {}
+                }
+            }).start();
+            // block here
+            cis.read();
+        }
+    }
+
     /**
      * @param valuesToCheck array of longs of range(0-999)
      * @throws Exception
@@ -70,18 +115,20 @@ public class CompressedInputStreamTest
         File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
         Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
         MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
-        CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
-        CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
-        Map<Long, Long> index = new HashMap<Long, Long>();
-        for (long l = 0L; l < 1000; l++)
+        CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+        Map<Long, Long> index = new HashMap<>();
+        try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
         {
-            index.put(l, writer.getFilePointer());
-            writer.stream.writeLong(l);
+            for (long l = 0L; l < 1000; l++)
+            {
+                index.put(l, writer.getFilePointer());
+                writer.stream.writeLong(l);
+            }
+            writer.close();
         }
-        writer.close();
 
         CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
-        List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
+        List<Pair<Long, Long>> sections = new ArrayList<>();
         for (long l : valuesToCheck)
         {
             long position = index.get(l);
@@ -100,14 +147,15 @@ public class CompressedInputStreamTest
             size += (c.length + 4); // 4bytes CRC
         byte[] toRead = new byte[size];
 
-        RandomAccessFile f = new RandomAccessFile(tmp, "r");
-        int pos = 0;
-        for (CompressionMetadata.Chunk c : chunks)
+        try (RandomAccessFile f = new RandomAccessFile(tmp, "r"))
         {
-            f.seek(c.offset);
-            pos += f.read(toRead, pos, c.length + 4);
+            int pos = 0;
+            for (CompressionMetadata.Chunk c : chunks)
+            {
+                f.seek(c.offset);
+                pos += f.read(toRead, pos, c.length + 4);
+            }
         }
-        f.close();
 
         if (testTruncate)
         {
@@ -119,13 +167,15 @@ public class CompressedInputStreamTest
         // read buffer using CompressedInputStream
         CompressionInfo info = new CompressionInfo(chunks, param);
         CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true);
-        DataInputStream in = new DataInputStream(input);
 
-        for (int i = 0; i < sections.size(); i++)
+        try (DataInputStream in = new DataInputStream(input))
         {
-            input.position(sections.get(i).left);
-            long exp = in.readLong();
-            assert exp == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + exp;
+            for (int i = 0; i < sections.size(); i++)
+            {
+                input.position(sections.get(i).left);
+                long readValue = in.readLong();
+                assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
+            }
         }
     }
 }