You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/11/23 21:00:39 UTC
[01/15] cassandra git commit: Fix CompressedInputStream for proper
cleanup
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 4a94f75b0 -> 8b9a9161c
refs/heads/cassandra-2.2 2aa834265 -> 056055feb
refs/heads/cassandra-3.0 9fe790d75 -> 0b3cfae4e
refs/heads/cassandra-3.1 e0c945228 -> e8737fda3
refs/heads/trunk 440366edd -> fa4c17383
Fix CompressedInputStream for proper cleanup
patch by Chris Moos and yukim; reviewed by Paulo Motta for CASSANDRA-10012
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b9a9161
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b9a9161
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b9a9161
Branch: refs/heads/cassandra-2.1
Commit: 8b9a9161caf678bfe2ead7aa970fc6b607372c42
Parents: 4a94f75
Author: Chris Moos <ch...@chrismoos.com>
Authored: Mon Nov 23 12:31:24 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 12:33:09 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compress/CompressedInputStream.java | 46 +++++++--
.../compress/CompressedStreamReader.java | 4 +
.../compress/CompressedInputStreamTest.java | 98 +++++++++++++++-----
4 files changed, 117 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 86e5cb2..c4dd54e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.12
+ * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
* (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
* Try next replica if not possible to connect to primary replica on
ColumnFamilyRecordReader (CASSANDRA-2388)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 449546f..b4a3065 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -45,7 +45,7 @@ public class CompressedInputStream extends InputStream
private final BlockingQueue<byte[]> dataBuffer;
// uncompressed bytes
- private byte[] buffer;
+ private final byte[] buffer;
// offset from the beginning of the buffer
protected long bufferOffset = 0;
@@ -64,6 +64,8 @@ public class CompressedInputStream extends InputStream
private long totalCompressedBytesRead;
private final boolean hasPostCompressionAdlerChecksums;
+ private Thread readerThread;
+
/**
* @param source Input source to read compressed data from
* @param info Compression info
@@ -75,9 +77,10 @@ public class CompressedInputStream extends InputStream
this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
this.buffer = new byte[info.parameters.chunkLength()];
// buffer is limited to store up to 1024 chunks
- this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+ this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
- new Thread(new Reader(source, info, dataBuffer)).start();
+ readerThread = new Thread(new Reader(source, info, dataBuffer));
+ readerThread.start();
}
public int read() throws IOException
@@ -143,7 +146,7 @@ public class CompressedInputStream extends InputStream
return totalCompressedBytesRead;
}
- static class Reader extends WrappedRunnable
+ class Reader extends WrappedRunnable
{
private final InputStream source;
private final Iterator<CompressionMetadata.Chunk> chunks;
@@ -159,7 +162,7 @@ public class CompressedInputStream extends InputStream
protected void runMayThrow() throws Exception
{
byte[] compressedWithCRC;
- while (chunks.hasNext())
+ while (!Thread.currentThread().isInterrupted() && chunks.hasNext())
{
CompressionMetadata.Chunk chunk = chunks.next();
@@ -169,16 +172,43 @@ public class CompressedInputStream extends InputStream
int bufferRead = 0;
while (bufferRead < readLength)
{
- int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
- if (r < 0)
+ int r;
+ try
+ {
+ r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
+ if (r < 0)
+ {
+ dataBuffer.put(POISON_PILL);
+ return; // throw exception where we consume dataBuffer
+ }
+ }
+ catch (IOException e)
{
dataBuffer.put(POISON_PILL);
- return; // throw exception where we consume dataBuffer
+ throw e;
}
bufferRead += r;
}
dataBuffer.put(compressedWithCRC);
}
+ synchronized(CompressedInputStream.this)
+ {
+ readerThread = null;
+ }
}
}
+
+ @Override
+ public void close() throws IOException
+ {
+ synchronized(this)
+ {
+ if (readerThread != null)
+ {
+ readerThread.interrupt();
+ readerThread = null;
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 0529496..4f60773 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -113,6 +113,10 @@ public class CompressedStreamReader extends StreamReader
else
throw Throwables.propagate(e);
}
+ finally
+ {
+ cis.close();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index f3007da..87e0003 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -17,12 +17,10 @@
*/
package org.apache.cassandra.streaming.compress;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.RandomAccessFile;
+import java.io.*;
import java.util.*;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
import org.junit.Test;
@@ -58,6 +56,53 @@ public class CompressedInputStreamTest
{
testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
}
+
+ /**
+ * Test CompressedInputStream not hang when closed while reading
+ * @throws Exception
+ */
+ @Test(expected = EOFException.class)
+ public void testClose() throws Exception
+ {
+ CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+ CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
+ final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
+ InputStream blockingInput = new InputStream()
+ {
+ @Override
+ public int read() throws IOException
+ {
+ try
+ {
+ // 10 second cut off not to stop other test in case
+ return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException("Interrupted as expected", e);
+ }
+ }
+ };
+ CompressionInfo info = new CompressionInfo(chunks, param);
+ try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, true))
+ {
+ new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ cis.close();
+ }
+ catch (Exception ignore) {}
+ }
+ }).start();
+ // block here
+ cis.read();
+ }
+ }
+
/**
* @param valuesToCheck array of longs of range(0-999)
* @throws Exception
@@ -70,18 +115,20 @@ public class CompressedInputStreamTest
File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
- CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
- CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
- Map<Long, Long> index = new HashMap<Long, Long>();
- for (long l = 0L; l < 1000; l++)
+ CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+ Map<Long, Long> index = new HashMap<>();
+ try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
{
- index.put(l, writer.getFilePointer());
- writer.stream.writeLong(l);
+ for (long l = 0L; l < 1000; l++)
+ {
+ index.put(l, writer.getFilePointer());
+ writer.stream.writeLong(l);
+ }
+ writer.close();
}
- writer.close();
CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
- List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
+ List<Pair<Long, Long>> sections = new ArrayList<>();
for (long l : valuesToCheck)
{
long position = index.get(l);
@@ -100,14 +147,15 @@ public class CompressedInputStreamTest
size += (c.length + 4); // 4bytes CRC
byte[] toRead = new byte[size];
- RandomAccessFile f = new RandomAccessFile(tmp, "r");
- int pos = 0;
- for (CompressionMetadata.Chunk c : chunks)
+ try (RandomAccessFile f = new RandomAccessFile(tmp, "r"))
{
- f.seek(c.offset);
- pos += f.read(toRead, pos, c.length + 4);
+ int pos = 0;
+ for (CompressionMetadata.Chunk c : chunks)
+ {
+ f.seek(c.offset);
+ pos += f.read(toRead, pos, c.length + 4);
+ }
}
- f.close();
if (testTruncate)
{
@@ -119,13 +167,15 @@ public class CompressedInputStreamTest
// read buffer using CompressedInputStream
CompressionInfo info = new CompressionInfo(chunks, param);
CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true);
- DataInputStream in = new DataInputStream(input);
- for (int i = 0; i < sections.size(); i++)
+ try (DataInputStream in = new DataInputStream(input))
{
- input.position(sections.get(i).left);
- long exp = in.readLong();
- assert exp == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + exp;
+ for (int i = 0; i < sections.size(); i++)
+ {
+ input.position(sections.get(i).left);
+ long readValue = in.readLong();
+ assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
+ }
}
}
}
[13/15] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.1
Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e8737fda
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e8737fda
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e8737fda
Branch: refs/heads/cassandra-3.1
Commit: e8737fda3aea7ac28fe04c02fa687d2606b8d6c8
Parents: e0c9452 0b3cfae
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 23 13:59:04 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 13:59:04 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compress/CompressedInputStream.java | 46 ++++++++--
.../compress/CompressedStreamReader.java | 4 +
.../compression/CompressedInputStreamTest.java | 88 ++++++++++++++++----
4 files changed, 114 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8737fda/CHANGES.txt
----------------------------------------------------------------------
[06/15] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/056055fe
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/056055fe
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/056055fe
Branch: refs/heads/trunk
Commit: 056055febd55e1c19a6216627b8568e60141b9fa
Parents: 2aa8342 8b9a916
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 23 13:17:39 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 13:17:39 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compress/CompressedInputStream.java | 46 +++++++--
.../compress/CompressedStreamReader.java | 4 +
.../compress/CompressedInputStreamTest.java | 98 +++++++++++++++-----
4 files changed, 117 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 146a0ce,c4dd54e..d11be26
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,17 -1,5 +1,18 @@@
-2.1.12
+2.2.4
+ * Don't do anticompaction after subrange repair (CASSANDRA-10422)
+ * Fix SimpleDateType type compatibility (CASSANDRA-10027)
+ * (Hadoop) fix splits calculation (CASSANDRA-10640)
+ * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
+ * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
+ * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
+ * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
+ * Expose phi values from failure detector via JMX and tweak debug
+ and trace logging (CASSANDRA-9526)
+ * Fix RangeNamesQueryPager (CASSANDRA-10509)
+ * Deprecate Pig support (CASSANDRA-10542)
+ * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
+Merged from 2.1:
+ * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
* (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
* Try next replica if not possible to connect to primary replica on
ColumnFamilyRecordReader (CASSANDRA-2388)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 872afcd,b4a3065..daa339a
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@@ -60,20 -62,25 +60,23 @@@ public class CompressedInputStream exte
private static final byte[] POISON_PILL = new byte[0];
private long totalCompressedBytesRead;
- private final boolean hasPostCompressionAdlerChecksums;
+ private Thread readerThread;
+
/**
* @param source Input source to read compressed data from
* @param info Compression info
*/
- public CompressedInputStream(InputStream source, CompressionInfo info, boolean hasPostCompressionAdlerChecksums)
+ public CompressedInputStream(InputStream source, CompressionInfo info)
{
this.info = info;
- this.checksum = hasPostCompressionAdlerChecksums ? new Adler32() : new CRC32();
- this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
+ this.checksum = new Adler32();
this.buffer = new byte[info.parameters.chunkLength()];
// buffer is limited to store up to 1024 chunks
- this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+ this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
- new Thread(new Reader(source, info, dataBuffer)).start();
+ readerThread = new Thread(new Reader(source, info, dataBuffer));
+ readerThread.start();
}
public int read() throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 0214c76,87e0003..e692441
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@@ -58,6 -56,53 +56,53 @@@ public class CompressedInputStreamTes
{
testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
}
+
+ /**
+ * Test CompressedInputStream not hang when closed while reading
- * @throws Exception
++ * @throws IOException
+ */
+ @Test(expected = EOFException.class)
- public void testClose() throws Exception
++ public void testClose() throws IOException
+ {
+ CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+ CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
+ final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
+ InputStream blockingInput = new InputStream()
+ {
+ @Override
+ public int read() throws IOException
+ {
+ try
+ {
+ // 10 second cut off not to stop other test in case
+ return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException("Interrupted as expected", e);
+ }
+ }
+ };
+ CompressionInfo info = new CompressionInfo(chunks, param);
- try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, true))
++ try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info))
+ {
+ new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ cis.close();
+ }
+ catch (Exception ignore) {}
+ }
+ }).start();
+ // block here
+ cis.read();
+ }
+ }
+
/**
* @param valuesToCheck array of longs of range(0-999)
* @throws Exception
@@@ -70,18 -115,20 +115,20 @@@
File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
- CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
- CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
- Map<Long, Long> index = new HashMap<Long, Long>();
- for (long l = 0L; l < 1000; l++)
+ CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+ Map<Long, Long> index = new HashMap<>();
+ try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
{
- index.put(l, writer.getFilePointer());
- writer.stream.writeLong(l);
+ for (long l = 0L; l < 1000; l++)
+ {
+ index.put(l, writer.getFilePointer());
+ writer.stream.writeLong(l);
+ }
- writer.close();
++ writer.finish();
}
- writer.finish();
CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
- List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
+ List<Pair<Long, Long>> sections = new ArrayList<>();
for (long l : valuesToCheck)
{
long position = index.get(l);
@@@ -118,14 -166,16 +166,16 @@@
// read buffer using CompressedInputStream
CompressionInfo info = new CompressionInfo(chunks, param);
- CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true);
+ CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info);
- DataInputStream in = new DataInputStream(input);
- for (int i = 0; i < sections.size(); i++)
+ try (DataInputStream in = new DataInputStream(input))
{
- input.position(sections.get(i).left);
- long readValue = in.readLong();
- assert readValue == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + readValue;
+ for (int i = 0; i < sections.size(); i++)
+ {
+ input.position(sections.get(i).left);
+ long readValue = in.readLong();
+ assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
+ }
}
}
}
[11/15] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0b3cfae4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0b3cfae4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0b3cfae4
Branch: refs/heads/cassandra-3.1
Commit: 0b3cfae4e619d1ece5ff8afc774eeb52b93166d8
Parents: 9fe790d 056055f
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 23 13:58:58 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 13:58:58 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compress/CompressedInputStream.java | 46 ++++++++--
.../compress/CompressedStreamReader.java | 4 +
.../compression/CompressedInputStreamTest.java | 88 ++++++++++++++++----
4 files changed, 114 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 4182cc1,d11be26..608d8f8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -11,7 -3,16 +11,8 @@@ Merged from 2.2
* Fix SimpleDateType type compatibility (CASSANDRA-10027)
* (Hadoop) fix splits calculation (CASSANDRA-10640)
* (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
- * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
- * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
- * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
- * Expose phi values from failure detector via JMX and tweak debug
- and trace logging (CASSANDRA-9526)
- * Fix RangeNamesQueryPager (CASSANDRA-10509)
- * Deprecate Pig support (CASSANDRA-10542)
- * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
Merged from 2.1:
+ * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
* (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
* Try next replica if not possible to connect to primary replica on
ColumnFamilyRecordReader (CASSANDRA-2388)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index ccd0ac5,daa339a..56dc63a
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@@ -42,10 -41,9 +42,10 @@@ public class CompressedInputStream exte
private final CompressionInfo info;
// chunk buffer
private final BlockingQueue<byte[]> dataBuffer;
+ private final Supplier<Double> crcCheckChanceSupplier;
// uncompressed bytes
- private byte[] buffer;
+ private final byte[] buffer;
// offset from the beginning of the buffer
protected long bufferOffset = 0;
@@@ -67,16 -67,16 +69,17 @@@
* @param source Input source to read compressed data from
* @param info Compression info
*/
- public CompressedInputStream(InputStream source, CompressionInfo info)
+ public CompressedInputStream(InputStream source, CompressionInfo info, ChecksumType checksumType, Supplier<Double> crcCheckChanceSupplier)
{
this.info = info;
- this.checksum = new Adler32();
+ this.checksum = checksumType.newInstance();
this.buffer = new byte[info.parameters.chunkLength()];
// buffer is limited to store up to 1024 chunks
- this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+ this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
+ this.crcCheckChanceSupplier = crcCheckChanceSupplier;
- new Thread(new Reader(source, info, dataBuffer)).start();
+ readerThread = new Thread(new Reader(source, info, dataBuffer));
+ readerThread.start();
}
public int read() throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
index 5646592,0000000..2162e32
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
@@@ -1,129 -1,0 +1,183 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.compression;
+
+import java.io.*;
+import java.util.*;
++import java.util.concurrent.SynchronousQueue;
++import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.apache.cassandra.db.ClusteringComparator;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.streaming.compress.CompressedInputStream;
+import org.apache.cassandra.streaming.compress.CompressionInfo;
+import org.apache.cassandra.utils.ChecksumType;
+import org.apache.cassandra.utils.Pair;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ */
+public class CompressedInputStreamTest
+{
+ @Test
+ public void testCompressedRead() throws Exception
+ {
+ testCompressedReadWith(new long[]{0L}, false);
+ testCompressedReadWith(new long[]{1L}, false);
+ testCompressedReadWith(new long[]{100L}, false);
+
+ testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false);
+ }
+
+ @Test(expected = EOFException.class)
+ public void testTruncatedRead() throws Exception
+ {
+ testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
+ }
++
++ /**
++ * Test CompressedInputStream not hang when closed while reading
++ * @throws IOException
++ */
++ @Test(expected = EOFException.class)
++ public void testClose() throws IOException
++ {
++ CompressionParams param = CompressionParams.snappy(32);
++ CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
++ final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
++ InputStream blockingInput = new InputStream()
++ {
++ @Override
++ public int read() throws IOException
++ {
++ try
++ {
++ // 10 second cut off not to stop other test in case
++ return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
++ }
++ catch (InterruptedException e)
++ {
++ throw new IOException("Interrupted as expected", e);
++ }
++ }
++ };
++ CompressionInfo info = new CompressionInfo(chunks, param);
++ try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, ChecksumType.CRC32, () -> 1.0))
++ {
++ new Thread(new Runnable()
++ {
++ @Override
++ public void run()
++ {
++ try
++ {
++ cis.close();
++ }
++ catch (Exception ignore) {}
++ }
++ }).start();
++ // block here
++ cis.read();
++ }
++ }
++
+ /**
+ * @param valuesToCheck array of longs of range(0-999)
+ * @throws Exception
+ */
+ private void testCompressedReadWith(long[] valuesToCheck, boolean testTruncate) throws Exception
+ {
+ assert valuesToCheck != null && valuesToCheck.length > 0;
+
+ // write compressed data file of longs
+ File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
+ Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
+ MetadataCollector collector = new MetadataCollector(new ClusteringComparator(BytesType.instance));
+ CompressionParams param = CompressionParams.snappy(32);
- CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
+ Map<Long, Long> index = new HashMap<Long, Long>();
- for (long l = 0L; l < 1000; l++)
++ try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
+ {
- index.put(l, writer.position());
- writer.writeLong(l);
++ for (long l = 0L; l < 1000; l++)
++ {
++ index.put(l, writer.position());
++ writer.writeLong(l);
++ }
++ writer.finish();
+ }
- writer.finish();
+
+ CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
- List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
++ List<Pair<Long, Long>> sections = new ArrayList<>();
+ for (long l : valuesToCheck)
+ {
+ long position = index.get(l);
+ sections.add(Pair.create(position, position + 8));
+ }
+ CompressionMetadata.Chunk[] chunks = comp.getChunksForSections(sections);
+ long totalSize = comp.getTotalSizeForSections(sections);
+ long expectedSize = 0;
+ for (CompressionMetadata.Chunk c : chunks)
+ expectedSize += c.length + 4;
+ assertEquals(expectedSize, totalSize);
+
+ // buffer up only relevant parts of file
+ int size = 0;
+ for (CompressionMetadata.Chunk c : chunks)
+ size += (c.length + 4); // 4bytes CRC
+ byte[] toRead = new byte[size];
+
- RandomAccessFile f = new RandomAccessFile(tmp, "r");
- int pos = 0;
- for (CompressionMetadata.Chunk c : chunks)
++ try (RandomAccessFile f = new RandomAccessFile(tmp, "r"))
+ {
- f.seek(c.offset);
- pos += f.read(toRead, pos, c.length + 4);
++ int pos = 0;
++ for (CompressionMetadata.Chunk c : chunks)
++ {
++ f.seek(c.offset);
++ pos += f.read(toRead, pos, c.length + 4);
++ }
+ }
- f.close();
+
+ if (testTruncate)
+ {
+ byte [] actuallyRead = new byte[50];
+ System.arraycopy(toRead, 0, actuallyRead, 0, 50);
+ toRead = actuallyRead;
+ }
+
+ // read buffer using CompressedInputStream
+ CompressionInfo info = new CompressionInfo(chunks, param);
+ CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info,
+ ChecksumType.CRC32, () -> 1.0);
- DataInputStream in = new DataInputStream(input);
+
- for (int i = 0; i < sections.size(); i++)
++ try (DataInputStream in = new DataInputStream(input))
+ {
- input.position(sections.get(i).left);
- long readValue = in.readLong();
- assert readValue == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + readValue;
++ for (int i = 0; i < sections.size(); i++)
++ {
++ input.position(sections.get(i).left);
++ long readValue = in.readLong();
++ assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
++ }
+ }
+ }
+}
[15/15] cassandra git commit: Merge branch 'cassandra-3.1' into trunk
Posted by yu...@apache.org.
Merge branch 'cassandra-3.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fa4c1738
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fa4c1738
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fa4c1738
Branch: refs/heads/trunk
Commit: fa4c17383a168d6eb3a84eac92d98dadcd4ba373
Parents: 440366e e8737fd
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 23 14:00:19 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 14:00:19 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compress/CompressedInputStream.java | 46 ++++++++--
.../compress/CompressedStreamReader.java | 4 +
.../compression/CompressedInputStreamTest.java | 88 ++++++++++++++++----
4 files changed, 114 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa4c1738/CHANGES.txt
----------------------------------------------------------------------
[14/15] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.1
Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e8737fda
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e8737fda
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e8737fda
Branch: refs/heads/trunk
Commit: e8737fda3aea7ac28fe04c02fa687d2606b8d6c8
Parents: e0c9452 0b3cfae
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 23 13:59:04 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 13:59:04 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compress/CompressedInputStream.java | 46 ++++++++--
.../compress/CompressedStreamReader.java | 4 +
.../compression/CompressedInputStreamTest.java | 88 ++++++++++++++++----
4 files changed, 114 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8737fda/CHANGES.txt
----------------------------------------------------------------------
[09/15] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/056055fe
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/056055fe
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/056055fe
Branch: refs/heads/cassandra-3.1
Commit: 056055febd55e1c19a6216627b8568e60141b9fa
Parents: 2aa8342 8b9a916
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 23 13:17:39 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 13:17:39 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compress/CompressedInputStream.java | 46 +++++++--
.../compress/CompressedStreamReader.java | 4 +
.../compress/CompressedInputStreamTest.java | 98 +++++++++++++++-----
4 files changed, 117 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 146a0ce,c4dd54e..d11be26
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,17 -1,5 +1,18 @@@
-2.1.12
+2.2.4
+ * Don't do anticompaction after subrange repair (CASSANDRA-10422)
+ * Fix SimpleDateType type compatibility (CASSANDRA-10027)
+ * (Hadoop) fix splits calculation (CASSANDRA-10640)
+ * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
+ * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
+ * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
+ * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
+ * Expose phi values from failure detector via JMX and tweak debug
+ and trace logging (CASSANDRA-9526)
+ * Fix RangeNamesQueryPager (CASSANDRA-10509)
+ * Deprecate Pig support (CASSANDRA-10542)
+ * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
+Merged from 2.1:
+ * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
* (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
* Try next replica if not possible to connect to primary replica on
ColumnFamilyRecordReader (CASSANDRA-2388)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 872afcd,b4a3065..daa339a
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@@ -60,20 -62,25 +60,23 @@@ public class CompressedInputStream exte
private static final byte[] POISON_PILL = new byte[0];
private long totalCompressedBytesRead;
- private final boolean hasPostCompressionAdlerChecksums;
+ private Thread readerThread;
+
/**
* @param source Input source to read compressed data from
* @param info Compression info
*/
- public CompressedInputStream(InputStream source, CompressionInfo info, boolean hasPostCompressionAdlerChecksums)
+ public CompressedInputStream(InputStream source, CompressionInfo info)
{
this.info = info;
- this.checksum = hasPostCompressionAdlerChecksums ? new Adler32() : new CRC32();
- this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
+ this.checksum = new Adler32();
this.buffer = new byte[info.parameters.chunkLength()];
// buffer is limited to store up to 1024 chunks
- this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+ this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
- new Thread(new Reader(source, info, dataBuffer)).start();
+ readerThread = new Thread(new Reader(source, info, dataBuffer));
+ readerThread.start();
}
public int read() throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 0214c76,87e0003..e692441
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@@ -58,6 -56,53 +56,53 @@@ public class CompressedInputStreamTes
{
testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
}
+
+ /**
+ * Test CompressedInputStream not hang when closed while reading
- * @throws Exception
++ * @throws IOException
+ */
+ @Test(expected = EOFException.class)
- public void testClose() throws Exception
++ public void testClose() throws IOException
+ {
+ CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+ CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
+ final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
+ InputStream blockingInput = new InputStream()
+ {
+ @Override
+ public int read() throws IOException
+ {
+ try
+ {
+ // 10 second cut off not to stop other test in case
+ return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException("Interrupted as expected", e);
+ }
+ }
+ };
+ CompressionInfo info = new CompressionInfo(chunks, param);
- try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, true))
++ try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info))
+ {
+ new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ cis.close();
+ }
+ catch (Exception ignore) {}
+ }
+ }).start();
+ // block here
+ cis.read();
+ }
+ }
+
/**
* @param valuesToCheck array of longs of range(0-999)
* @throws Exception
@@@ -70,18 -115,20 +115,20 @@@
File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
- CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
- CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
- Map<Long, Long> index = new HashMap<Long, Long>();
- for (long l = 0L; l < 1000; l++)
+ CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+ Map<Long, Long> index = new HashMap<>();
+ try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
{
- index.put(l, writer.getFilePointer());
- writer.stream.writeLong(l);
+ for (long l = 0L; l < 1000; l++)
+ {
+ index.put(l, writer.getFilePointer());
+ writer.stream.writeLong(l);
+ }
- writer.close();
++ writer.finish();
}
- writer.finish();
CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
- List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
+ List<Pair<Long, Long>> sections = new ArrayList<>();
for (long l : valuesToCheck)
{
long position = index.get(l);
@@@ -118,14 -166,16 +166,16 @@@
// read buffer using CompressedInputStream
CompressionInfo info = new CompressionInfo(chunks, param);
- CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true);
+ CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info);
- DataInputStream in = new DataInputStream(input);
- for (int i = 0; i < sections.size(); i++)
+ try (DataInputStream in = new DataInputStream(input))
{
- input.position(sections.get(i).left);
- long readValue = in.readLong();
- assert readValue == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + readValue;
+ for (int i = 0; i < sections.size(); i++)
+ {
+ input.position(sections.get(i).left);
+ long readValue = in.readLong();
+ assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
+ }
}
}
}
[05/15] cassandra git commit: Fix CompressedInputStream for proper
cleanup
Posted by yu...@apache.org.
Fix CompressedInputStream for proper cleanup
patch by Chris Moos and yukim; reviewed by Paulo Motta for CASSANDRA-10012
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b9a9161
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b9a9161
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b9a9161
Branch: refs/heads/cassandra-3.1
Commit: 8b9a9161caf678bfe2ead7aa970fc6b607372c42
Parents: 4a94f75
Author: Chris Moos <ch...@chrismoos.com>
Authored: Mon Nov 23 12:31:24 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 12:33:09 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compress/CompressedInputStream.java | 46 +++++++--
.../compress/CompressedStreamReader.java | 4 +
.../compress/CompressedInputStreamTest.java | 98 +++++++++++++++-----
4 files changed, 117 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 86e5cb2..c4dd54e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.12
+ * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
* (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
* Try next replica if not possible to connect to primary replica on
ColumnFamilyRecordReader (CASSANDRA-2388)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 449546f..b4a3065 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -45,7 +45,7 @@ public class CompressedInputStream extends InputStream
private final BlockingQueue<byte[]> dataBuffer;
// uncompressed bytes
- private byte[] buffer;
+ private final byte[] buffer;
// offset from the beginning of the buffer
protected long bufferOffset = 0;
@@ -64,6 +64,8 @@ public class CompressedInputStream extends InputStream
private long totalCompressedBytesRead;
private final boolean hasPostCompressionAdlerChecksums;
+ private Thread readerThread;
+
/**
* @param source Input source to read compressed data from
* @param info Compression info
@@ -75,9 +77,10 @@ public class CompressedInputStream extends InputStream
this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
this.buffer = new byte[info.parameters.chunkLength()];
// buffer is limited to store up to 1024 chunks
- this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+ this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
- new Thread(new Reader(source, info, dataBuffer)).start();
+ readerThread = new Thread(new Reader(source, info, dataBuffer));
+ readerThread.start();
}
public int read() throws IOException
@@ -143,7 +146,7 @@ public class CompressedInputStream extends InputStream
return totalCompressedBytesRead;
}
- static class Reader extends WrappedRunnable
+ class Reader extends WrappedRunnable
{
private final InputStream source;
private final Iterator<CompressionMetadata.Chunk> chunks;
@@ -159,7 +162,7 @@ public class CompressedInputStream extends InputStream
protected void runMayThrow() throws Exception
{
byte[] compressedWithCRC;
- while (chunks.hasNext())
+ while (!Thread.currentThread().isInterrupted() && chunks.hasNext())
{
CompressionMetadata.Chunk chunk = chunks.next();
@@ -169,16 +172,43 @@ public class CompressedInputStream extends InputStream
int bufferRead = 0;
while (bufferRead < readLength)
{
- int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
- if (r < 0)
+ int r;
+ try
+ {
+ r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
+ if (r < 0)
+ {
+ dataBuffer.put(POISON_PILL);
+ return; // throw exception where we consume dataBuffer
+ }
+ }
+ catch (IOException e)
{
dataBuffer.put(POISON_PILL);
- return; // throw exception where we consume dataBuffer
+ throw e;
}
bufferRead += r;
}
dataBuffer.put(compressedWithCRC);
}
+ synchronized(CompressedInputStream.this)
+ {
+ readerThread = null;
+ }
}
}
+
+ @Override
+ public void close() throws IOException
+ {
+ synchronized(this)
+ {
+ if (readerThread != null)
+ {
+ readerThread.interrupt();
+ readerThread = null;
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 0529496..4f60773 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -113,6 +113,10 @@ public class CompressedStreamReader extends StreamReader
else
throw Throwables.propagate(e);
}
+ finally
+ {
+ cis.close();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index f3007da..87e0003 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -17,12 +17,10 @@
*/
package org.apache.cassandra.streaming.compress;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.RandomAccessFile;
+import java.io.*;
import java.util.*;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
import org.junit.Test;
@@ -58,6 +56,53 @@ public class CompressedInputStreamTest
{
testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
}
+
+ /**
+ * Test CompressedInputStream not hang when closed while reading
+ * @throws Exception
+ */
+ @Test(expected = EOFException.class)
+ public void testClose() throws Exception
+ {
+ CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+ CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
+ final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
+ InputStream blockingInput = new InputStream()
+ {
+ @Override
+ public int read() throws IOException
+ {
+ try
+ {
+ // 10 second cut off not to stop other test in case
+ return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException("Interrupted as expected", e);
+ }
+ }
+ };
+ CompressionInfo info = new CompressionInfo(chunks, param);
+ try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, true))
+ {
+ new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ cis.close();
+ }
+ catch (Exception ignore) {}
+ }
+ }).start();
+ // block here
+ cis.read();
+ }
+ }
+
/**
* @param valuesToCheck array of longs of range(0-999)
* @throws Exception
@@ -70,18 +115,20 @@ public class CompressedInputStreamTest
File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
- CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
- CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
- Map<Long, Long> index = new HashMap<Long, Long>();
- for (long l = 0L; l < 1000; l++)
+ CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+ Map<Long, Long> index = new HashMap<>();
+ try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
{
- index.put(l, writer.getFilePointer());
- writer.stream.writeLong(l);
+ for (long l = 0L; l < 1000; l++)
+ {
+ index.put(l, writer.getFilePointer());
+ writer.stream.writeLong(l);
+ }
+ writer.close();
}
- writer.close();
CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
- List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
+ List<Pair<Long, Long>> sections = new ArrayList<>();
for (long l : valuesToCheck)
{
long position = index.get(l);
@@ -100,14 +147,15 @@ public class CompressedInputStreamTest
size += (c.length + 4); // 4bytes CRC
byte[] toRead = new byte[size];
- RandomAccessFile f = new RandomAccessFile(tmp, "r");
- int pos = 0;
- for (CompressionMetadata.Chunk c : chunks)
+ try (RandomAccessFile f = new RandomAccessFile(tmp, "r"))
{
- f.seek(c.offset);
- pos += f.read(toRead, pos, c.length + 4);
+ int pos = 0;
+ for (CompressionMetadata.Chunk c : chunks)
+ {
+ f.seek(c.offset);
+ pos += f.read(toRead, pos, c.length + 4);
+ }
}
- f.close();
if (testTruncate)
{
@@ -119,13 +167,15 @@ public class CompressedInputStreamTest
// read buffer using CompressedInputStream
CompressionInfo info = new CompressionInfo(chunks, param);
CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true);
- DataInputStream in = new DataInputStream(input);
- for (int i = 0; i < sections.size(); i++)
+ try (DataInputStream in = new DataInputStream(input))
{
- input.position(sections.get(i).left);
- long exp = in.readLong();
- assert exp == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + exp;
+ for (int i = 0; i < sections.size(); i++)
+ {
+ input.position(sections.get(i).left);
+ long readValue = in.readLong();
+ assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
+ }
}
}
}
[04/15] cassandra git commit: Fix CompressedInputStream for proper
cleanup
Posted by yu...@apache.org.
Fix CompressedInputStream for proper cleanup
patch by Chris Moos and yukim; reviewed by Paulo Motta for CASSANDRA-10012
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b9a9161
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b9a9161
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b9a9161
Branch: refs/heads/cassandra-3.0
Commit: 8b9a9161caf678bfe2ead7aa970fc6b607372c42
Parents: 4a94f75
Author: Chris Moos <ch...@chrismoos.com>
Authored: Mon Nov 23 12:31:24 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 12:33:09 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compress/CompressedInputStream.java | 46 +++++++--
.../compress/CompressedStreamReader.java | 4 +
.../compress/CompressedInputStreamTest.java | 98 +++++++++++++++-----
4 files changed, 117 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 86e5cb2..c4dd54e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.12
+ * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
* (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
* Try next replica if not possible to connect to primary replica on
ColumnFamilyRecordReader (CASSANDRA-2388)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 449546f..b4a3065 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -45,7 +45,7 @@ public class CompressedInputStream extends InputStream
private final BlockingQueue<byte[]> dataBuffer;
// uncompressed bytes
- private byte[] buffer;
+ private final byte[] buffer;
// offset from the beginning of the buffer
protected long bufferOffset = 0;
@@ -64,6 +64,8 @@ public class CompressedInputStream extends InputStream
private long totalCompressedBytesRead;
private final boolean hasPostCompressionAdlerChecksums;
+ private Thread readerThread;
+
/**
* @param source Input source to read compressed data from
* @param info Compression info
@@ -75,9 +77,10 @@ public class CompressedInputStream extends InputStream
this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
this.buffer = new byte[info.parameters.chunkLength()];
// buffer is limited to store up to 1024 chunks
- this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+ this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
- new Thread(new Reader(source, info, dataBuffer)).start();
+ readerThread = new Thread(new Reader(source, info, dataBuffer));
+ readerThread.start();
}
public int read() throws IOException
@@ -143,7 +146,7 @@ public class CompressedInputStream extends InputStream
return totalCompressedBytesRead;
}
- static class Reader extends WrappedRunnable
+ class Reader extends WrappedRunnable
{
private final InputStream source;
private final Iterator<CompressionMetadata.Chunk> chunks;
@@ -159,7 +162,7 @@ public class CompressedInputStream extends InputStream
protected void runMayThrow() throws Exception
{
byte[] compressedWithCRC;
- while (chunks.hasNext())
+ while (!Thread.currentThread().isInterrupted() && chunks.hasNext())
{
CompressionMetadata.Chunk chunk = chunks.next();
@@ -169,16 +172,43 @@ public class CompressedInputStream extends InputStream
int bufferRead = 0;
while (bufferRead < readLength)
{
- int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
- if (r < 0)
+ int r;
+ try
+ {
+ r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
+ if (r < 0)
+ {
+ dataBuffer.put(POISON_PILL);
+ return; // throw exception where we consume dataBuffer
+ }
+ }
+ catch (IOException e)
{
dataBuffer.put(POISON_PILL);
- return; // throw exception where we consume dataBuffer
+ throw e;
}
bufferRead += r;
}
dataBuffer.put(compressedWithCRC);
}
+ synchronized(CompressedInputStream.this)
+ {
+ readerThread = null;
+ }
}
}
+
+ @Override
+ public void close() throws IOException
+ {
+ synchronized(this)
+ {
+ if (readerThread != null)
+ {
+ readerThread.interrupt();
+ readerThread = null;
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 0529496..4f60773 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -113,6 +113,10 @@ public class CompressedStreamReader extends StreamReader
else
throw Throwables.propagate(e);
}
+ finally
+ {
+ cis.close();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index f3007da..87e0003 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -17,12 +17,10 @@
*/
package org.apache.cassandra.streaming.compress;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.RandomAccessFile;
+import java.io.*;
import java.util.*;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
import org.junit.Test;
@@ -58,6 +56,53 @@ public class CompressedInputStreamTest
{
testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
}
+
+ /**
+ * Test CompressedInputStream not hang when closed while reading
+ * @throws Exception
+ */
+ @Test(expected = EOFException.class)
+ public void testClose() throws Exception
+ {
+ CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+ CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
+ final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
+ InputStream blockingInput = new InputStream()
+ {
+ @Override
+ public int read() throws IOException
+ {
+ try
+ {
+ // 10 second cut off not to stop other test in case
+ return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException("Interrupted as expected", e);
+ }
+ }
+ };
+ CompressionInfo info = new CompressionInfo(chunks, param);
+ try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, true))
+ {
+ new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ cis.close();
+ }
+ catch (Exception ignore) {}
+ }
+ }).start();
+ // block here
+ cis.read();
+ }
+ }
+
/**
* @param valuesToCheck array of longs of range(0-999)
* @throws Exception
@@ -70,18 +115,20 @@ public class CompressedInputStreamTest
File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
- CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
- CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
- Map<Long, Long> index = new HashMap<Long, Long>();
- for (long l = 0L; l < 1000; l++)
+ CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+ Map<Long, Long> index = new HashMap<>();
+ try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
{
- index.put(l, writer.getFilePointer());
- writer.stream.writeLong(l);
+ for (long l = 0L; l < 1000; l++)
+ {
+ index.put(l, writer.getFilePointer());
+ writer.stream.writeLong(l);
+ }
+ writer.close();
}
- writer.close();
CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
- List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
+ List<Pair<Long, Long>> sections = new ArrayList<>();
for (long l : valuesToCheck)
{
long position = index.get(l);
@@ -100,14 +147,15 @@ public class CompressedInputStreamTest
size += (c.length + 4); // 4bytes CRC
byte[] toRead = new byte[size];
- RandomAccessFile f = new RandomAccessFile(tmp, "r");
- int pos = 0;
- for (CompressionMetadata.Chunk c : chunks)
+ try (RandomAccessFile f = new RandomAccessFile(tmp, "r"))
{
- f.seek(c.offset);
- pos += f.read(toRead, pos, c.length + 4);
+ int pos = 0;
+ for (CompressionMetadata.Chunk c : chunks)
+ {
+ f.seek(c.offset);
+ pos += f.read(toRead, pos, c.length + 4);
+ }
}
- f.close();
if (testTruncate)
{
@@ -119,13 +167,15 @@ public class CompressedInputStreamTest
// read buffer using CompressedInputStream
CompressionInfo info = new CompressionInfo(chunks, param);
CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true);
- DataInputStream in = new DataInputStream(input);
- for (int i = 0; i < sections.size(); i++)
+ try (DataInputStream in = new DataInputStream(input))
{
- input.position(sections.get(i).left);
- long exp = in.readLong();
- assert exp == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + exp;
+ for (int i = 0; i < sections.size(); i++)
+ {
+ input.position(sections.get(i).left);
+ long readValue = in.readLong();
+ assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
+ }
}
}
}
[12/15] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0b3cfae4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0b3cfae4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0b3cfae4
Branch: refs/heads/cassandra-3.0
Commit: 0b3cfae4e619d1ece5ff8afc774eeb52b93166d8
Parents: 9fe790d 056055f
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 23 13:58:58 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 13:58:58 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compress/CompressedInputStream.java | 46 ++++++++--
.../compress/CompressedStreamReader.java | 4 +
.../compression/CompressedInputStreamTest.java | 88 ++++++++++++++++----
4 files changed, 114 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 4182cc1,d11be26..608d8f8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -11,7 -3,16 +11,8 @@@ Merged from 2.2
* Fix SimpleDateType type compatibility (CASSANDRA-10027)
* (Hadoop) fix splits calculation (CASSANDRA-10640)
* (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
- * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
- * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
- * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
- * Expose phi values from failure detector via JMX and tweak debug
- and trace logging (CASSANDRA-9526)
- * Fix RangeNamesQueryPager (CASSANDRA-10509)
- * Deprecate Pig support (CASSANDRA-10542)
- * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
Merged from 2.1:
+ * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
* (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
* Try next replica if not possible to connect to primary replica on
ColumnFamilyRecordReader (CASSANDRA-2388)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index ccd0ac5,daa339a..56dc63a
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@@ -42,10 -41,9 +42,10 @@@ public class CompressedInputStream exte
private final CompressionInfo info;
// chunk buffer
private final BlockingQueue<byte[]> dataBuffer;
+ private final Supplier<Double> crcCheckChanceSupplier;
// uncompressed bytes
- private byte[] buffer;
+ private final byte[] buffer;
// offset from the beginning of the buffer
protected long bufferOffset = 0;
@@@ -67,16 -67,16 +69,17 @@@
* @param source Input source to read compressed data from
* @param info Compression info
*/
- public CompressedInputStream(InputStream source, CompressionInfo info)
+ public CompressedInputStream(InputStream source, CompressionInfo info, ChecksumType checksumType, Supplier<Double> crcCheckChanceSupplier)
{
this.info = info;
- this.checksum = new Adler32();
+ this.checksum = checksumType.newInstance();
this.buffer = new byte[info.parameters.chunkLength()];
// buffer is limited to store up to 1024 chunks
- this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+ this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
+ this.crcCheckChanceSupplier = crcCheckChanceSupplier;
- new Thread(new Reader(source, info, dataBuffer)).start();
+ readerThread = new Thread(new Reader(source, info, dataBuffer));
+ readerThread.start();
}
public int read() throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
index 5646592,0000000..2162e32
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
@@@ -1,129 -1,0 +1,183 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.compression;
+
+import java.io.*;
+import java.util.*;
++import java.util.concurrent.SynchronousQueue;
++import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.apache.cassandra.db.ClusteringComparator;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.streaming.compress.CompressedInputStream;
+import org.apache.cassandra.streaming.compress.CompressionInfo;
+import org.apache.cassandra.utils.ChecksumType;
+import org.apache.cassandra.utils.Pair;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ */
+public class CompressedInputStreamTest
+{
+ @Test
+ public void testCompressedRead() throws Exception
+ {
+ testCompressedReadWith(new long[]{0L}, false);
+ testCompressedReadWith(new long[]{1L}, false);
+ testCompressedReadWith(new long[]{100L}, false);
+
+ testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false);
+ }
+
+ @Test(expected = EOFException.class)
+ public void testTruncatedRead() throws Exception
+ {
+ testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
+ }
++
++ /**
++ * Test CompressedInputStream not hang when closed while reading
++ * @throws IOException
++ */
++ @Test(expected = EOFException.class)
++ public void testClose() throws IOException
++ {
++ CompressionParams param = CompressionParams.snappy(32);
++ CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
++ final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
++ InputStream blockingInput = new InputStream()
++ {
++ @Override
++ public int read() throws IOException
++ {
++ try
++ {
++ // 10 second cut off not to stop other test in case
++ return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
++ }
++ catch (InterruptedException e)
++ {
++ throw new IOException("Interrupted as expected", e);
++ }
++ }
++ };
++ CompressionInfo info = new CompressionInfo(chunks, param);
++ try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, ChecksumType.CRC32, () -> 1.0))
++ {
++ new Thread(new Runnable()
++ {
++ @Override
++ public void run()
++ {
++ try
++ {
++ cis.close();
++ }
++ catch (Exception ignore) {}
++ }
++ }).start();
++ // block here
++ cis.read();
++ }
++ }
++
+ /**
+ * @param valuesToCheck array of longs of range(0-999)
+ * @throws Exception
+ */
+ private void testCompressedReadWith(long[] valuesToCheck, boolean testTruncate) throws Exception
+ {
+ assert valuesToCheck != null && valuesToCheck.length > 0;
+
+ // write compressed data file of longs
+ File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
+ Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
+ MetadataCollector collector = new MetadataCollector(new ClusteringComparator(BytesType.instance));
+ CompressionParams param = CompressionParams.snappy(32);
- CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
+ Map<Long, Long> index = new HashMap<Long, Long>();
- for (long l = 0L; l < 1000; l++)
++ try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
+ {
- index.put(l, writer.position());
- writer.writeLong(l);
++ for (long l = 0L; l < 1000; l++)
++ {
++ index.put(l, writer.position());
++ writer.writeLong(l);
++ }
++ writer.finish();
+ }
- writer.finish();
+
+ CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
- List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
++ List<Pair<Long, Long>> sections = new ArrayList<>();
+ for (long l : valuesToCheck)
+ {
+ long position = index.get(l);
+ sections.add(Pair.create(position, position + 8));
+ }
+ CompressionMetadata.Chunk[] chunks = comp.getChunksForSections(sections);
+ long totalSize = comp.getTotalSizeForSections(sections);
+ long expectedSize = 0;
+ for (CompressionMetadata.Chunk c : chunks)
+ expectedSize += c.length + 4;
+ assertEquals(expectedSize, totalSize);
+
+ // buffer up only relevant parts of file
+ int size = 0;
+ for (CompressionMetadata.Chunk c : chunks)
+ size += (c.length + 4); // 4bytes CRC
+ byte[] toRead = new byte[size];
+
- RandomAccessFile f = new RandomAccessFile(tmp, "r");
- int pos = 0;
- for (CompressionMetadata.Chunk c : chunks)
++ try (RandomAccessFile f = new RandomAccessFile(tmp, "r"))
+ {
- f.seek(c.offset);
- pos += f.read(toRead, pos, c.length + 4);
++ int pos = 0;
++ for (CompressionMetadata.Chunk c : chunks)
++ {
++ f.seek(c.offset);
++ pos += f.read(toRead, pos, c.length + 4);
++ }
+ }
- f.close();
+
+ if (testTruncate)
+ {
+ byte [] actuallyRead = new byte[50];
+ System.arraycopy(toRead, 0, actuallyRead, 0, 50);
+ toRead = actuallyRead;
+ }
+
+ // read buffer using CompressedInputStream
+ CompressionInfo info = new CompressionInfo(chunks, param);
+ CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info,
+ ChecksumType.CRC32, () -> 1.0);
- DataInputStream in = new DataInputStream(input);
+
- for (int i = 0; i < sections.size(); i++)
++ try (DataInputStream in = new DataInputStream(input))
+ {
- input.position(sections.get(i).left);
- long readValue = in.readLong();
- assert readValue == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + readValue;
++ for (int i = 0; i < sections.size(); i++)
++ {
++ input.position(sections.get(i).left);
++ long readValue = in.readLong();
++ assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
++ }
+ }
+ }
+}
[02/15] cassandra git commit: Fix CompressedInputStream for proper
cleanup
Posted by yu...@apache.org.
Fix CompressedInputStream for proper cleanup
patch by Chris Moos and yukim; reviewed by Paulo Motta for CASSANDRA-10012
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b9a9161
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b9a9161
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b9a9161
Branch: refs/heads/cassandra-2.2
Commit: 8b9a9161caf678bfe2ead7aa970fc6b607372c42
Parents: 4a94f75
Author: Chris Moos <ch...@chrismoos.com>
Authored: Mon Nov 23 12:31:24 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 12:33:09 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compress/CompressedInputStream.java | 46 +++++++--
.../compress/CompressedStreamReader.java | 4 +
.../compress/CompressedInputStreamTest.java | 98 +++++++++++++++-----
4 files changed, 117 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 86e5cb2..c4dd54e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.12
+ * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
* (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
* Try next replica if not possible to connect to primary replica on
ColumnFamilyRecordReader (CASSANDRA-2388)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 449546f..b4a3065 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -45,7 +45,7 @@ public class CompressedInputStream extends InputStream
private final BlockingQueue<byte[]> dataBuffer;
// uncompressed bytes
- private byte[] buffer;
+ private final byte[] buffer;
// offset from the beginning of the buffer
protected long bufferOffset = 0;
@@ -64,6 +64,8 @@ public class CompressedInputStream extends InputStream
private long totalCompressedBytesRead;
private final boolean hasPostCompressionAdlerChecksums;
+ private Thread readerThread;
+
/**
* @param source Input source to read compressed data from
* @param info Compression info
@@ -75,9 +77,10 @@ public class CompressedInputStream extends InputStream
this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
this.buffer = new byte[info.parameters.chunkLength()];
// buffer is limited to store up to 1024 chunks
- this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+ this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
- new Thread(new Reader(source, info, dataBuffer)).start();
+ readerThread = new Thread(new Reader(source, info, dataBuffer));
+ readerThread.start();
}
public int read() throws IOException
@@ -143,7 +146,7 @@ public class CompressedInputStream extends InputStream
return totalCompressedBytesRead;
}
- static class Reader extends WrappedRunnable
+ class Reader extends WrappedRunnable
{
private final InputStream source;
private final Iterator<CompressionMetadata.Chunk> chunks;
@@ -159,7 +162,7 @@ public class CompressedInputStream extends InputStream
protected void runMayThrow() throws Exception
{
byte[] compressedWithCRC;
- while (chunks.hasNext())
+ while (!Thread.currentThread().isInterrupted() && chunks.hasNext())
{
CompressionMetadata.Chunk chunk = chunks.next();
@@ -169,16 +172,43 @@ public class CompressedInputStream extends InputStream
int bufferRead = 0;
while (bufferRead < readLength)
{
- int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
- if (r < 0)
+ int r;
+ try
+ {
+ r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
+ if (r < 0)
+ {
+ dataBuffer.put(POISON_PILL);
+ return; // throw exception where we consume dataBuffer
+ }
+ }
+ catch (IOException e)
{
dataBuffer.put(POISON_PILL);
- return; // throw exception where we consume dataBuffer
+ throw e;
}
bufferRead += r;
}
dataBuffer.put(compressedWithCRC);
}
+ synchronized(CompressedInputStream.this)
+ {
+ readerThread = null;
+ }
}
}
+
+ @Override
+ public void close() throws IOException
+ {
+ synchronized(this)
+ {
+ if (readerThread != null)
+ {
+ readerThread.interrupt();
+ readerThread = null;
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 0529496..4f60773 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -113,6 +113,10 @@ public class CompressedStreamReader extends StreamReader
else
throw Throwables.propagate(e);
}
+ finally
+ {
+ cis.close();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index f3007da..87e0003 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -17,12 +17,10 @@
*/
package org.apache.cassandra.streaming.compress;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.RandomAccessFile;
+import java.io.*;
import java.util.*;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
import org.junit.Test;
@@ -58,6 +56,53 @@ public class CompressedInputStreamTest
{
testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
}
+
+ /**
+ * Test CompressedInputStream not hang when closed while reading
+ * @throws Exception
+ */
+ @Test(expected = EOFException.class)
+ public void testClose() throws Exception
+ {
+ CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+ CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
+ final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
+ InputStream blockingInput = new InputStream()
+ {
+ @Override
+ public int read() throws IOException
+ {
+ try
+ {
+ // 10 second cut off not to stop other test in case
+ return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException("Interrupted as expected", e);
+ }
+ }
+ };
+ CompressionInfo info = new CompressionInfo(chunks, param);
+ try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, true))
+ {
+ new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ cis.close();
+ }
+ catch (Exception ignore) {}
+ }
+ }).start();
+ // block here
+ cis.read();
+ }
+ }
+
/**
* @param valuesToCheck array of longs of range(0-999)
* @throws Exception
@@ -70,18 +115,20 @@ public class CompressedInputStreamTest
File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
- CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
- CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
- Map<Long, Long> index = new HashMap<Long, Long>();
- for (long l = 0L; l < 1000; l++)
+ CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+ Map<Long, Long> index = new HashMap<>();
+ try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
{
- index.put(l, writer.getFilePointer());
- writer.stream.writeLong(l);
+ for (long l = 0L; l < 1000; l++)
+ {
+ index.put(l, writer.getFilePointer());
+ writer.stream.writeLong(l);
+ }
+ writer.close();
}
- writer.close();
CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
- List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
+ List<Pair<Long, Long>> sections = new ArrayList<>();
for (long l : valuesToCheck)
{
long position = index.get(l);
@@ -100,14 +147,15 @@ public class CompressedInputStreamTest
size += (c.length + 4); // 4bytes CRC
byte[] toRead = new byte[size];
- RandomAccessFile f = new RandomAccessFile(tmp, "r");
- int pos = 0;
- for (CompressionMetadata.Chunk c : chunks)
+ try (RandomAccessFile f = new RandomAccessFile(tmp, "r"))
{
- f.seek(c.offset);
- pos += f.read(toRead, pos, c.length + 4);
+ int pos = 0;
+ for (CompressionMetadata.Chunk c : chunks)
+ {
+ f.seek(c.offset);
+ pos += f.read(toRead, pos, c.length + 4);
+ }
}
- f.close();
if (testTruncate)
{
@@ -119,13 +167,15 @@ public class CompressedInputStreamTest
// read buffer using CompressedInputStream
CompressionInfo info = new CompressionInfo(chunks, param);
CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true);
- DataInputStream in = new DataInputStream(input);
- for (int i = 0; i < sections.size(); i++)
+ try (DataInputStream in = new DataInputStream(input))
{
- input.position(sections.get(i).left);
- long exp = in.readLong();
- assert exp == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + exp;
+ for (int i = 0; i < sections.size(); i++)
+ {
+ input.position(sections.get(i).left);
+ long readValue = in.readLong();
+ assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
+ }
}
}
}
[03/15] cassandra git commit: Fix CompressedInputStream for proper
cleanup
Posted by yu...@apache.org.
Fix CompressedInputStream for proper cleanup
patch by Chris Moos and yukim; reviewed by Paulo Motta for CASSANDRA-10012
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b9a9161
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b9a9161
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b9a9161
Branch: refs/heads/trunk
Commit: 8b9a9161caf678bfe2ead7aa970fc6b607372c42
Parents: 4a94f75
Author: Chris Moos <ch...@chrismoos.com>
Authored: Mon Nov 23 12:31:24 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 12:33:09 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compress/CompressedInputStream.java | 46 +++++++--
.../compress/CompressedStreamReader.java | 4 +
.../compress/CompressedInputStreamTest.java | 98 +++++++++++++++-----
4 files changed, 117 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 86e5cb2..c4dd54e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.12
+ * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
* (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
* Try next replica if not possible to connect to primary replica on
ColumnFamilyRecordReader (CASSANDRA-2388)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 449546f..b4a3065 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -45,7 +45,7 @@ public class CompressedInputStream extends InputStream
private final BlockingQueue<byte[]> dataBuffer;
// uncompressed bytes
- private byte[] buffer;
+ private final byte[] buffer;
// offset from the beginning of the buffer
protected long bufferOffset = 0;
@@ -64,6 +64,8 @@ public class CompressedInputStream extends InputStream
private long totalCompressedBytesRead;
private final boolean hasPostCompressionAdlerChecksums;
+ private Thread readerThread;
+
/**
* @param source Input source to read compressed data from
* @param info Compression info
@@ -75,9 +77,10 @@ public class CompressedInputStream extends InputStream
this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
this.buffer = new byte[info.parameters.chunkLength()];
// buffer is limited to store up to 1024 chunks
- this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+ this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
- new Thread(new Reader(source, info, dataBuffer)).start();
+ readerThread = new Thread(new Reader(source, info, dataBuffer));
+ readerThread.start();
}
public int read() throws IOException
@@ -143,7 +146,7 @@ public class CompressedInputStream extends InputStream
return totalCompressedBytesRead;
}
- static class Reader extends WrappedRunnable
+ class Reader extends WrappedRunnable
{
private final InputStream source;
private final Iterator<CompressionMetadata.Chunk> chunks;
@@ -159,7 +162,7 @@ public class CompressedInputStream extends InputStream
protected void runMayThrow() throws Exception
{
byte[] compressedWithCRC;
- while (chunks.hasNext())
+ while (!Thread.currentThread().isInterrupted() && chunks.hasNext())
{
CompressionMetadata.Chunk chunk = chunks.next();
@@ -169,16 +172,43 @@ public class CompressedInputStream extends InputStream
int bufferRead = 0;
while (bufferRead < readLength)
{
- int r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
- if (r < 0)
+ int r;
+ try
+ {
+ r = source.read(compressedWithCRC, bufferRead, readLength - bufferRead);
+ if (r < 0)
+ {
+ dataBuffer.put(POISON_PILL);
+ return; // throw exception where we consume dataBuffer
+ }
+ }
+ catch (IOException e)
{
dataBuffer.put(POISON_PILL);
- return; // throw exception where we consume dataBuffer
+ throw e;
}
bufferRead += r;
}
dataBuffer.put(compressedWithCRC);
}
+ synchronized(CompressedInputStream.this)
+ {
+ readerThread = null;
+ }
}
}
+
+ @Override
+ public void close() throws IOException
+ {
+ synchronized(this)
+ {
+ if (readerThread != null)
+ {
+ readerThread.interrupt();
+ readerThread = null;
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 0529496..4f60773 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -113,6 +113,10 @@ public class CompressedStreamReader extends StreamReader
else
throw Throwables.propagate(e);
}
+ finally
+ {
+ cis.close();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b9a9161/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index f3007da..87e0003 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -17,12 +17,10 @@
*/
package org.apache.cassandra.streaming.compress;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.RandomAccessFile;
+import java.io.*;
import java.util.*;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
import org.junit.Test;
@@ -58,6 +56,53 @@ public class CompressedInputStreamTest
{
testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
}
+
+ /**
+ * Test CompressedInputStream not hang when closed while reading
+ * @throws Exception
+ */
+ @Test(expected = EOFException.class)
+ public void testClose() throws Exception
+ {
+ CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+ CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
+ final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
+ InputStream blockingInput = new InputStream()
+ {
+ @Override
+ public int read() throws IOException
+ {
+ try
+ {
+ // 10 second cut off not to stop other test in case
+ return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException("Interrupted as expected", e);
+ }
+ }
+ };
+ CompressionInfo info = new CompressionInfo(chunks, param);
+ try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, true))
+ {
+ new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ cis.close();
+ }
+ catch (Exception ignore) {}
+ }
+ }).start();
+ // block here
+ cis.read();
+ }
+ }
+
/**
* @param valuesToCheck array of longs of range(0-999)
* @throws Exception
@@ -70,18 +115,20 @@ public class CompressedInputStreamTest
File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
- CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
- CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
- Map<Long, Long> index = new HashMap<Long, Long>();
- for (long l = 0L; l < 1000; l++)
+ CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+ Map<Long, Long> index = new HashMap<>();
+ try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
{
- index.put(l, writer.getFilePointer());
- writer.stream.writeLong(l);
+ for (long l = 0L; l < 1000; l++)
+ {
+ index.put(l, writer.getFilePointer());
+ writer.stream.writeLong(l);
+ }
+ writer.close();
}
- writer.close();
CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
- List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
+ List<Pair<Long, Long>> sections = new ArrayList<>();
for (long l : valuesToCheck)
{
long position = index.get(l);
@@ -100,14 +147,15 @@ public class CompressedInputStreamTest
size += (c.length + 4); // 4bytes CRC
byte[] toRead = new byte[size];
- RandomAccessFile f = new RandomAccessFile(tmp, "r");
- int pos = 0;
- for (CompressionMetadata.Chunk c : chunks)
+ try (RandomAccessFile f = new RandomAccessFile(tmp, "r"))
{
- f.seek(c.offset);
- pos += f.read(toRead, pos, c.length + 4);
+ int pos = 0;
+ for (CompressionMetadata.Chunk c : chunks)
+ {
+ f.seek(c.offset);
+ pos += f.read(toRead, pos, c.length + 4);
+ }
}
- f.close();
if (testTruncate)
{
@@ -119,13 +167,15 @@ public class CompressedInputStreamTest
// read buffer using CompressedInputStream
CompressionInfo info = new CompressionInfo(chunks, param);
CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true);
- DataInputStream in = new DataInputStream(input);
- for (int i = 0; i < sections.size(); i++)
+ try (DataInputStream in = new DataInputStream(input))
{
- input.position(sections.get(i).left);
- long exp = in.readLong();
- assert exp == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + exp;
+ for (int i = 0; i < sections.size(); i++)
+ {
+ input.position(sections.get(i).left);
+ long readValue = in.readLong();
+ assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
+ }
}
}
}
[08/15] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/056055fe
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/056055fe
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/056055fe
Branch: refs/heads/cassandra-3.0
Commit: 056055febd55e1c19a6216627b8568e60141b9fa
Parents: 2aa8342 8b9a916
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 23 13:17:39 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 13:17:39 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compress/CompressedInputStream.java | 46 +++++++--
.../compress/CompressedStreamReader.java | 4 +
.../compress/CompressedInputStreamTest.java | 98 +++++++++++++++-----
4 files changed, 117 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 146a0ce,c4dd54e..d11be26
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,17 -1,5 +1,18 @@@
-2.1.12
+2.2.4
+ * Don't do anticompaction after subrange repair (CASSANDRA-10422)
+ * Fix SimpleDateType type compatibility (CASSANDRA-10027)
+ * (Hadoop) fix splits calculation (CASSANDRA-10640)
+ * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
+ * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
+ * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
+ * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
+ * Expose phi values from failure detector via JMX and tweak debug
+ and trace logging (CASSANDRA-9526)
+ * Fix RangeNamesQueryPager (CASSANDRA-10509)
+ * Deprecate Pig support (CASSANDRA-10542)
+ * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
+Merged from 2.1:
+ * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
* (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
* Try next replica if not possible to connect to primary replica on
ColumnFamilyRecordReader (CASSANDRA-2388)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 872afcd,b4a3065..daa339a
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@@ -60,20 -62,25 +60,23 @@@ public class CompressedInputStream exte
private static final byte[] POISON_PILL = new byte[0];
private long totalCompressedBytesRead;
- private final boolean hasPostCompressionAdlerChecksums;
+ private Thread readerThread;
+
/**
* @param source Input source to read compressed data from
* @param info Compression info
*/
- public CompressedInputStream(InputStream source, CompressionInfo info, boolean hasPostCompressionAdlerChecksums)
+ public CompressedInputStream(InputStream source, CompressionInfo info)
{
this.info = info;
- this.checksum = hasPostCompressionAdlerChecksums ? new Adler32() : new CRC32();
- this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
+ this.checksum = new Adler32();
this.buffer = new byte[info.parameters.chunkLength()];
// buffer is limited to store up to 1024 chunks
- this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+ this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
- new Thread(new Reader(source, info, dataBuffer)).start();
+ readerThread = new Thread(new Reader(source, info, dataBuffer));
+ readerThread.start();
}
public int read() throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 0214c76,87e0003..e692441
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@@ -58,6 -56,53 +56,53 @@@ public class CompressedInputStreamTes
{
testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
}
+
+ /**
+ * Test CompressedInputStream not hang when closed while reading
- * @throws Exception
++ * @throws IOException
+ */
+ @Test(expected = EOFException.class)
- public void testClose() throws Exception
++ public void testClose() throws IOException
+ {
+ CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+ CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
+ final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
+ InputStream blockingInput = new InputStream()
+ {
+ @Override
+ public int read() throws IOException
+ {
+ try
+ {
+ // 10 second cut off not to stop other test in case
+ return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException("Interrupted as expected", e);
+ }
+ }
+ };
+ CompressionInfo info = new CompressionInfo(chunks, param);
- try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, true))
++ try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info))
+ {
+ new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ cis.close();
+ }
+ catch (Exception ignore) {}
+ }
+ }).start();
+ // block here
+ cis.read();
+ }
+ }
+
/**
* @param valuesToCheck array of longs of range(0-999)
* @throws Exception
@@@ -70,18 -115,20 +115,20 @@@
File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
- CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
- CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
- Map<Long, Long> index = new HashMap<Long, Long>();
- for (long l = 0L; l < 1000; l++)
+ CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+ Map<Long, Long> index = new HashMap<>();
+ try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
{
- index.put(l, writer.getFilePointer());
- writer.stream.writeLong(l);
+ for (long l = 0L; l < 1000; l++)
+ {
+ index.put(l, writer.getFilePointer());
+ writer.stream.writeLong(l);
+ }
- writer.close();
++ writer.finish();
}
- writer.finish();
CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
- List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
+ List<Pair<Long, Long>> sections = new ArrayList<>();
for (long l : valuesToCheck)
{
long position = index.get(l);
@@@ -118,14 -166,16 +166,16 @@@
// read buffer using CompressedInputStream
CompressionInfo info = new CompressionInfo(chunks, param);
- CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true);
+ CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info);
- DataInputStream in = new DataInputStream(input);
- for (int i = 0; i < sections.size(); i++)
+ try (DataInputStream in = new DataInputStream(input))
{
- input.position(sections.get(i).left);
- long readValue = in.readLong();
- assert readValue == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + readValue;
+ for (int i = 0; i < sections.size(); i++)
+ {
+ input.position(sections.get(i).left);
+ long readValue = in.readLong();
+ assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
+ }
}
}
}
[10/15] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by yu...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0b3cfae4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0b3cfae4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0b3cfae4
Branch: refs/heads/trunk
Commit: 0b3cfae4e619d1ece5ff8afc774eeb52b93166d8
Parents: 9fe790d 056055f
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 23 13:58:58 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 13:58:58 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compress/CompressedInputStream.java | 46 ++++++++--
.../compress/CompressedStreamReader.java | 4 +
.../compression/CompressedInputStreamTest.java | 88 ++++++++++++++++----
4 files changed, 114 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 4182cc1,d11be26..608d8f8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -11,7 -3,16 +11,8 @@@ Merged from 2.2
* Fix SimpleDateType type compatibility (CASSANDRA-10027)
* (Hadoop) fix splits calculation (CASSANDRA-10640)
* (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
- * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
- * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
- * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
- * Expose phi values from failure detector via JMX and tweak debug
- and trace logging (CASSANDRA-9526)
- * Fix RangeNamesQueryPager (CASSANDRA-10509)
- * Deprecate Pig support (CASSANDRA-10542)
- * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
Merged from 2.1:
+ * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
* (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
* Try next replica if not possible to connect to primary replica on
ColumnFamilyRecordReader (CASSANDRA-2388)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index ccd0ac5,daa339a..56dc63a
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@@ -42,10 -41,9 +42,10 @@@ public class CompressedInputStream exte
private final CompressionInfo info;
// chunk buffer
private final BlockingQueue<byte[]> dataBuffer;
+ private final Supplier<Double> crcCheckChanceSupplier;
// uncompressed bytes
- private byte[] buffer;
+ private final byte[] buffer;
// offset from the beginning of the buffer
protected long bufferOffset = 0;
@@@ -67,16 -67,16 +69,17 @@@
* @param source Input source to read compressed data from
* @param info Compression info
*/
- public CompressedInputStream(InputStream source, CompressionInfo info)
+ public CompressedInputStream(InputStream source, CompressionInfo info, ChecksumType checksumType, Supplier<Double> crcCheckChanceSupplier)
{
this.info = info;
- this.checksum = new Adler32();
+ this.checksum = checksumType.newInstance();
this.buffer = new byte[info.parameters.chunkLength()];
// buffer is limited to store up to 1024 chunks
- this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+ this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
+ this.crcCheckChanceSupplier = crcCheckChanceSupplier;
- new Thread(new Reader(source, info, dataBuffer)).start();
+ readerThread = new Thread(new Reader(source, info, dataBuffer));
+ readerThread.start();
}
public int read() throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b3cfae4/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
index 5646592,0000000..2162e32
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
@@@ -1,129 -1,0 +1,183 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.compression;
+
+import java.io.*;
+import java.util.*;
++import java.util.concurrent.SynchronousQueue;
++import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.apache.cassandra.db.ClusteringComparator;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.streaming.compress.CompressedInputStream;
+import org.apache.cassandra.streaming.compress.CompressionInfo;
+import org.apache.cassandra.utils.ChecksumType;
+import org.apache.cassandra.utils.Pair;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ */
+public class CompressedInputStreamTest
+{
+ @Test
+ public void testCompressedRead() throws Exception
+ {
+ testCompressedReadWith(new long[]{0L}, false);
+ testCompressedReadWith(new long[]{1L}, false);
+ testCompressedReadWith(new long[]{100L}, false);
+
+ testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, false);
+ }
+
+ @Test(expected = EOFException.class)
+ public void testTruncatedRead() throws Exception
+ {
+ testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
+ }
++
++ /**
++ * Test CompressedInputStream not hang when closed while reading
++ * @throws IOException
++ */
++ @Test(expected = EOFException.class)
++ public void testClose() throws IOException
++ {
++ CompressionParams param = CompressionParams.snappy(32);
++ CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
++ final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
++ InputStream blockingInput = new InputStream()
++ {
++ @Override
++ public int read() throws IOException
++ {
++ try
++ {
++ // 10 second cut off not to stop other test in case
++ return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
++ }
++ catch (InterruptedException e)
++ {
++ throw new IOException("Interrupted as expected", e);
++ }
++ }
++ };
++ CompressionInfo info = new CompressionInfo(chunks, param);
++ try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, ChecksumType.CRC32, () -> 1.0))
++ {
++ new Thread(new Runnable()
++ {
++ @Override
++ public void run()
++ {
++ try
++ {
++ cis.close();
++ }
++ catch (Exception ignore) {}
++ }
++ }).start();
++ // block here
++ cis.read();
++ }
++ }
++
+ /**
+ * @param valuesToCheck array of longs of range(0-999)
+ * @throws Exception
+ */
+ private void testCompressedReadWith(long[] valuesToCheck, boolean testTruncate) throws Exception
+ {
+ assert valuesToCheck != null && valuesToCheck.length > 0;
+
+ // write compressed data file of longs
+ File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
+ Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
+ MetadataCollector collector = new MetadataCollector(new ClusteringComparator(BytesType.instance));
+ CompressionParams param = CompressionParams.snappy(32);
- CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
+ Map<Long, Long> index = new HashMap<Long, Long>();
- for (long l = 0L; l < 1000; l++)
++ try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
+ {
- index.put(l, writer.position());
- writer.writeLong(l);
++ for (long l = 0L; l < 1000; l++)
++ {
++ index.put(l, writer.position());
++ writer.writeLong(l);
++ }
++ writer.finish();
+ }
- writer.finish();
+
+ CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
- List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
++ List<Pair<Long, Long>> sections = new ArrayList<>();
+ for (long l : valuesToCheck)
+ {
+ long position = index.get(l);
+ sections.add(Pair.create(position, position + 8));
+ }
+ CompressionMetadata.Chunk[] chunks = comp.getChunksForSections(sections);
+ long totalSize = comp.getTotalSizeForSections(sections);
+ long expectedSize = 0;
+ for (CompressionMetadata.Chunk c : chunks)
+ expectedSize += c.length + 4;
+ assertEquals(expectedSize, totalSize);
+
+ // buffer up only relevant parts of file
+ int size = 0;
+ for (CompressionMetadata.Chunk c : chunks)
+ size += (c.length + 4); // 4bytes CRC
+ byte[] toRead = new byte[size];
+
- RandomAccessFile f = new RandomAccessFile(tmp, "r");
- int pos = 0;
- for (CompressionMetadata.Chunk c : chunks)
++ try (RandomAccessFile f = new RandomAccessFile(tmp, "r"))
+ {
- f.seek(c.offset);
- pos += f.read(toRead, pos, c.length + 4);
++ int pos = 0;
++ for (CompressionMetadata.Chunk c : chunks)
++ {
++ f.seek(c.offset);
++ pos += f.read(toRead, pos, c.length + 4);
++ }
+ }
- f.close();
+
+ if (testTruncate)
+ {
+ byte [] actuallyRead = new byte[50];
+ System.arraycopy(toRead, 0, actuallyRead, 0, 50);
+ toRead = actuallyRead;
+ }
+
+ // read buffer using CompressedInputStream
+ CompressionInfo info = new CompressionInfo(chunks, param);
+ CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info,
+ ChecksumType.CRC32, () -> 1.0);
- DataInputStream in = new DataInputStream(input);
+
- for (int i = 0; i < sections.size(); i++)
++ try (DataInputStream in = new DataInputStream(input))
+ {
- input.position(sections.get(i).left);
- long readValue = in.readLong();
- assert readValue == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + readValue;
++ for (int i = 0; i < sections.size(); i++)
++ {
++ input.position(sections.get(i).left);
++ long readValue = in.readLong();
++ assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
++ }
+ }
+ }
+}
[07/15] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by yu...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/056055fe
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/056055fe
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/056055fe
Branch: refs/heads/cassandra-2.2
Commit: 056055febd55e1c19a6216627b8568e60141b9fa
Parents: 2aa8342 8b9a916
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Nov 23 13:17:39 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Nov 23 13:17:39 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compress/CompressedInputStream.java | 46 +++++++--
.../compress/CompressedStreamReader.java | 4 +
.../compress/CompressedInputStreamTest.java | 98 +++++++++++++++-----
4 files changed, 117 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 146a0ce,c4dd54e..d11be26
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,17 -1,5 +1,18 @@@
-2.1.12
+2.2.4
+ * Don't do anticompaction after subrange repair (CASSANDRA-10422)
+ * Fix SimpleDateType type compatibility (CASSANDRA-10027)
+ * (Hadoop) fix splits calculation (CASSANDRA-10640)
+ * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
+ * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
+ * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
+ * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
+ * Expose phi values from failure detector via JMX and tweak debug
+ and trace logging (CASSANDRA-9526)
+ * Fix RangeNamesQueryPager (CASSANDRA-10509)
+ * Deprecate Pig support (CASSANDRA-10542)
+ * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
+Merged from 2.1:
+ * Fix CompressedInputStream for proper cleanup (CASSANDRA-10012)
* (cqlsh) Support counters in COPY commands (CASSANDRA-9043)
* Try next replica if not possible to connect to primary replica on
ColumnFamilyRecordReader (CASSANDRA-2388)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 872afcd,b4a3065..daa339a
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@@ -60,20 -62,25 +60,23 @@@ public class CompressedInputStream exte
private static final byte[] POISON_PILL = new byte[0];
private long totalCompressedBytesRead;
- private final boolean hasPostCompressionAdlerChecksums;
+ private Thread readerThread;
+
/**
* @param source Input source to read compressed data from
* @param info Compression info
*/
- public CompressedInputStream(InputStream source, CompressionInfo info, boolean hasPostCompressionAdlerChecksums)
+ public CompressedInputStream(InputStream source, CompressionInfo info)
{
this.info = info;
- this.checksum = hasPostCompressionAdlerChecksums ? new Adler32() : new CRC32();
- this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
+ this.checksum = new Adler32();
this.buffer = new byte[info.parameters.chunkLength()];
// buffer is limited to store up to 1024 chunks
- this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024));
+ this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
- new Thread(new Reader(source, info, dataBuffer)).start();
+ readerThread = new Thread(new Reader(source, info, dataBuffer));
+ readerThread.start();
}
public int read() throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/056055fe/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 0214c76,87e0003..e692441
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@@ -58,6 -56,53 +56,53 @@@ public class CompressedInputStreamTes
{
testCompressedReadWith(new long[]{1L, 122L, 123L, 124L, 456L}, true);
}
+
+ /**
+ * Test CompressedInputStream not hang when closed while reading
- * @throws Exception
++ * @throws IOException
+ */
+ @Test(expected = EOFException.class)
- public void testClose() throws Exception
++ public void testClose() throws IOException
+ {
+ CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+ CompressionMetadata.Chunk[] chunks = {new CompressionMetadata.Chunk(0, 100)};
+ final SynchronousQueue<Integer> blocker = new SynchronousQueue<>();
+ InputStream blockingInput = new InputStream()
+ {
+ @Override
+ public int read() throws IOException
+ {
+ try
+ {
+ // 10 second cut off not to stop other test in case
+ return Objects.requireNonNull(blocker.poll(10, TimeUnit.SECONDS));
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException("Interrupted as expected", e);
+ }
+ }
+ };
+ CompressionInfo info = new CompressionInfo(chunks, param);
- try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info, true))
++ try (CompressedInputStream cis = new CompressedInputStream(blockingInput, info))
+ {
+ new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ cis.close();
+ }
+ catch (Exception ignore) {}
+ }
+ }).start();
+ // block here
+ cis.read();
+ }
+ }
+
/**
* @param valuesToCheck array of longs of range(0-999)
* @throws Exception
@@@ -70,18 -115,20 +115,20 @@@
File tmp = new File(File.createTempFile("cassandra", "unittest").getParent(), "ks-cf-ib-1-Data.db");
Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
- CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.EMPTY_MAP);
- CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector);
- Map<Long, Long> index = new HashMap<Long, Long>();
- for (long l = 0L; l < 1000; l++)
+ CompressionParameters param = new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String, String>emptyMap());
+ Map<Long, Long> index = new HashMap<>();
+ try (CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO), param, collector))
{
- index.put(l, writer.getFilePointer());
- writer.stream.writeLong(l);
+ for (long l = 0L; l < 1000; l++)
+ {
+ index.put(l, writer.getFilePointer());
+ writer.stream.writeLong(l);
+ }
- writer.close();
++ writer.finish();
}
- writer.finish();
CompressionMetadata comp = CompressionMetadata.create(tmp.getAbsolutePath());
- List<Pair<Long, Long>> sections = new ArrayList<Pair<Long, Long>>();
+ List<Pair<Long, Long>> sections = new ArrayList<>();
for (long l : valuesToCheck)
{
long position = index.get(l);
@@@ -118,14 -166,16 +166,16 @@@
// read buffer using CompressedInputStream
CompressionInfo info = new CompressionInfo(chunks, param);
- CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true);
+ CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info);
- DataInputStream in = new DataInputStream(input);
- for (int i = 0; i < sections.size(); i++)
+ try (DataInputStream in = new DataInputStream(input))
{
- input.position(sections.get(i).left);
- long readValue = in.readLong();
- assert readValue == valuesToCheck[i] : "expected " + valuesToCheck[i] + " but was " + readValue;
+ for (int i = 0; i < sections.size(); i++)
+ {
+ input.position(sections.get(i).left);
+ long readValue = in.readLong();
+ assertEquals("expected " + valuesToCheck[i] + " but was " + readValue, valuesToCheck[i], readValue);
+ }
}
}
}