You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/11/23 21:00:41 UTC
[03/15] cassandra git commit: Fix CompressedInputStream for proper
cleanup
Fix CompressedInputStream for proper cleanup
patch by Chris Moos and yukim; reviewed by Paulo Motta for CASSANDRA-10012
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b9a9161
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b9a9161
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b9a9161
Branch: refs/heads/trunk
Commit: 8b9a9161caf678bfe2ead7aa970fc6b607372c42
Parents: 4a94f75
Author: Chris Moos <ch...@chrismoos.com>
Authored: Mon Nov 23 12:31:24 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 12:33:09 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compress/CompressedInputStream.java | 46 +++++++--
.../compress/CompressedStreamReader.java | 4 +
.../compress/CompressedInputStreamTest.java | 98 +++++++++++++++-----
4 files changed, 117 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 86e5cb2..c4dd54e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.12
+ * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
* (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
* Try next replica if not possible to connect to primary replica on
ColumnFamilyRecordReader (CASSANDRA-2388)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 449546f..b4a3065 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -45,7 +45,7 @@ public class CompressedInputStream extends InputStream
private final BlockingQueue<byte[]> dataBuffer;
// uncompressed bytes
- private byte[] buffer;
+ private final byte[] buffer;
// offset from the beginning of the buffer
protected long bufferOffset = 0;
@@ -64,6 +64,8 @@ public class CompressedInputStream extends InputStream
private long totalCompressedBytesRead;
private final boolean hasPostCompressionAdlerChecksums;
+ private Thread readerThread;
+
/**
* @param source Input source to read compressed data from
* @param info Compression info
@@ -75,9 +77,10 @@ public class CompressedInputStream extends InputStream
this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
this.buffer = new byte[info.parameters.chunkLength()];
// buffer is limited to store up to 1024 chunks
- this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+ this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
- new Thread(new Reader(source, info, dataBuffer)).start();
+ readerThread = new Thread(new Reader(source, info, dataBuffer));
+ readerThread.start();
}
public int read() throws IOException
@@ -143,7 +146,7 @@ public class CompressedInputStream extends InputStream
return totalCompressedBytesRead;
}
- static class Reader extends WrappedRunnable
+ class Reader extends WrappedRunnable
{
private final InputStream source;
private final Iterator<CompressionMetadata.Chunk> chunks;
@@ -159,7 +162,7 @@ public class CompressedInputStream extends InputStream
protected void runMayThrow() throws Exception
{
byte[] compressedWithCRC;
- while (chunks.hasNext())
+ while (!Thread.currentThread().isInterrupted() && chunks.hasNext())
{
CompressionMetadata.Chunk chunk = chunks.next();
@@ -169,16 +172,43 @@ public class CompressedInputStream extends InputStream
int bufferRead = 0;
while (bufferRead < readLength)
{
- int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
- if (r < 0)
+ int r;
+ try
+ {
+ r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
+ if (r < 0)
+ {
+ dataBuffer.put(POISON_PILL);
+ return; // throw exception where we consume dataBuffer
+ }
+ }
+ catch (IOException e)
{
dataBuffer.put(POISON_PILL);
- return; // throw exception where we consume dataBuffer
+ throw e;
}
bufferRead += r;
}
dataBuffer.put(compressedWithCRC);
}
+ synchronized(CompressedInputStream.this)
+ {
+ readerThread = null;
+ }
}
}
+
+ @Override
+ public void close() throws IOException
+ {
+ synchronized(this)
+ {
+ if (readerThread != null)
+ {
+ readerThread.interrupt();
+ readerThread = null;
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 0529496..4f60773 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -113,6 +113,10 @@ public class CompressedStreamReader extends StreamReader
else
throw Throwables.propagate(e);
}
+ finally
+ {
+ cis.close();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index f3007da..87e0003 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -17,12 +17,10 @@
*/
package org.apache.cassandra.streaming.compress;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.RandomAccessFile;
+import java.io.*;
import java.util.*;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
import org.junit.Test;
@@ -58,6 +56,53 @@ public class CompressedInputStreamTest
{
testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
}
+
+ /**
+ * Test CompressedInputStream not hang when closed while reading
+ * @throws Exception
+ */
+ @Test(expected = EOFException.class)
+ public void testClose() throws Exception
+ {
+ CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+ CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
+ final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
+ InputStream blockingInput = new InputStream()
+ {
+ @Override
+ public int read() throws IOException
+ {
+ try
+ {
+ // 10 second cut off not to stop other test in case
+ return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException("Interrupted as expected", e);
+ }
+ }
+ };
+ CompressionInfo info = new CompressionInfo(chunks, param);
+ try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, true))
+ {
+ new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ cis.close();
+ }
+ catch (Exception ignore) {}
+ }
+ }).start();
+ // block here
+ cis.read();
+ }
+ }
+
/**
* @param valuesToCheck array of longs of range(0-999)
* @throws Exception
@@ -70,18 +115,20 @@ public class CompressedInputStreamTest
File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
- CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
- CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
- Map<Long, Long> index = new HashMap<Long, Long>();
- for (long l = 0L; l < 1000; l++)
+ CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+ Map<Long, Long> index = new HashMap<>();
+ try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
{
- index.put(l, writer.getFilePointer());
- writer.stream.writeLong(l);
+ for (long l = 0L; l < 1000; l++)
+ {
+ index.put(l, writer.getFilePointer());
+ writer.stream.writeLong(l);
+ }
+ writer.close();
}
- writer.close();
CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
- List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
+ List<Pair<Long, Long>> sections = new ArrayList<>();
for (long l : valuesToCheck)
{
long position = index.get(l);
@@ -100,14 +147,15 @@ public class CompressedInputStreamTest
size += (c.length + 4); // 4bytes CRC
byte[] toRead = new byte[size];
- RandomAccessFile f = new RandomAccessFile(tmp, "r");
- int pos = 0;
- for (CompressionMetadata.Chunk c : chunks)
+ try (RandomAccessFile f = new RandomAccessFile(tmp, "r"))
{
- f.seek(c.offset);
- pos += f.read(toRead, pos, c.length + 4);
+ int pos = 0;
+ for (CompressionMetadata.Chunk c : chunks)
+ {
+ f.seek(c.offset);
+ pos += f.read(toRead, pos, c.length + 4);
+ }
}
- f.close();
if (testTruncate)
{
@@ -119,13 +167,15 @@ public class CompressedInputStreamTest
// read buffer using CompressedInputStream
CompressionInfo info = new CompressionInfo(chunks, param);
CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true);
- DataInputStream in = new DataInputStream(input);
- for (int i = 0; i < sections.size(); i++)
+ try (DataInputStream in = new DataInputStream(input))
{
- input.position(sections.get(i).left);
- long exp = in.readLong();
- assert exp == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + exp;
+ for (int i = 0; i < sections.size(); i++)
+ {
+ input.position(sections.get(i).left);
+ long readValue = in.readLong();
+ assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
+ }
}
}
}