You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/12/02 04:44:31 UTC
cassandra git commit: Fix integer overflow in DataOutputBuffer
doubling and test as best as possible given that allocating 2 gigs in a unit
test is problematic.
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.2 2491ede35 -> a320737b1
Fix integer overflow in DataOutputBuffer doubling and test as best as possible given that allocating 2 gigs in a unit test is problematic.
Patch by Ariel Weisberg; reviewed by tjake for CASSANDRA-10592
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a320737b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a320737b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a320737b
Branch: refs/heads/cassandra-2.2
Commit: a320737b18c19e3ec59035e5e487f2af1dcd0172
Parents: 2491ede
Author: Ariel Weisberg <ar...@datastax.com>
Authored: Tue Oct 27 12:19:14 2015 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Tue Dec 1 22:34:28 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../io/util/BufferedDataOutputStreamPlus.java | 20 +-
.../cassandra/io/util/DataOutputBuffer.java | 78 ++++++-
.../io/util/DataOutputBufferFixed.java | 2 +-
.../cassandra/io/util/SafeMemoryWriter.java | 10 +-
.../cassandra/io/util/DataOutputTest.java | 202 +++++++++++++++++++
6 files changed, 296 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a320737b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7541212..cf73f57 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,7 @@
* Fix RangeNamesQueryPager (CASSANDRA-10509)
* Deprecate Pig support (CASSANDRA-10542)
* Reduce contention getting instances of CompositeType (CASSANDRA-10433)
+ * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large buffers (CASSANDRA-10592)
Merged from 2.1:
* Add proper error handling to stream receiver (CASSANDRA-10774)
* Warn or fail when changing cluster topology live (CASSANDRA-10243)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a320737b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
index 5669a8d..d55db47 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
@@ -118,7 +118,7 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
}
else
{
- doFlush();
+ doFlush(len - copied);
}
}
}
@@ -142,11 +142,12 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
else
{
assert toWrite.isDirect();
- if (toWrite.remaining() > buffer.remaining())
+ int toWriteRemaining = toWrite.remaining();
+ if (toWriteRemaining > buffer.remaining())
{
- doFlush();
+ doFlush(toWriteRemaining);
MemoryUtil.duplicateDirectByteBuffer(toWrite, hollowBuffer);
- if (toWrite.remaining() > buffer.remaining())
+ if (toWriteRemaining > buffer.remaining())
{
while (hollowBuffer.hasRemaining())
channel.write(hollowBuffer);
@@ -254,7 +255,10 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
write(buffer);
}
- protected void doFlush() throws IOException
+ /*
+ * Count is the number of bytes remaining to write ignoring already remaining capacity
+ */
+ protected void doFlush(int count) throws IOException
{
buffer.flip();
@@ -267,13 +271,13 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
@Override
public void flush() throws IOException
{
- doFlush();
+ doFlush(0);
}
@Override
public void close() throws IOException
{
- doFlush();
+ doFlush(0);
channel.close();
FileUtils.clean(buffer);
buffer = null;
@@ -282,7 +286,7 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
protected void ensureRemaining(int minimum) throws IOException
{
if (buffer.remaining() < minimum)
- doFlush();
+ doFlush(minimum);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a320737b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
index 6ffc895..6ea6d97 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
@@ -21,6 +21,10 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
+import org.apache.cassandra.config.Config;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
/**
* An implementation of the DataOutputStream interface using a FastByteArrayOutputStream and exposing
@@ -30,6 +34,11 @@ import java.nio.channels.WritableByteChannel;
*/
public class DataOutputBuffer extends BufferedDataOutputStreamPlus
{
+ /*
+ * Threshold at which resizing transitions from doubling to increasing by 50%
+ */
+ private static final long DOUBLING_THRESHOLD = Long.getLong(Config.PROPERTY_PREFIX + "DOB_DOUBLING_THRESHOLD_MB", 64);
+
public DataOutputBuffer()
{
this(128);
@@ -51,16 +60,70 @@ public class DataOutputBuffer extends BufferedDataOutputStreamPlus
throw new UnsupportedOperationException();
}
+ //The actual value observed in Hotspot is only -2
+ //ByteArrayOutputStream uses -8
+ @VisibleForTesting
+ static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+ @VisibleForTesting
+ static int saturatedArraySizeCast(long size)
+ {
+ Preconditions.checkArgument(size >= 0);
+ return (int)Math.min(MAX_ARRAY_SIZE, size);
+ }
+
+ @VisibleForTesting
+ static int checkedArraySizeCast(long size)
+ {
+ Preconditions.checkArgument(size >= 0);
+ Preconditions.checkArgument(size <= MAX_ARRAY_SIZE);
+ return (int)size;
+ }
+
@Override
- protected void doFlush() throws IOException
+ protected void doFlush(int count) throws IOException
+ {
+ reallocate(count);
+ }
+
+ //Hack for test, make it possible to override checking the buffer capacity
+ @VisibleForTesting
+ long capacity()
+ {
+ return buffer.capacity();
+ }
+
+ @VisibleForTesting
+ long validateReallocation(long newSize)
+ {
+ int saturatedSize = saturatedArraySizeCast(newSize);
+ if (saturatedSize <= capacity())
+ throw new RuntimeException();
+ return saturatedSize;
+ }
+
+ @VisibleForTesting
+ long calculateNewSize(long count)
{
- reallocate(buffer.capacity() * 2);
+ long capacity = capacity();
+ //Both sides of this max expression need to use long arithmetic to avoid integer overflow
+ //count and capacity are longs so that ensures it right now.
+ long newSize = capacity + count;
+
+ //For large buffers don't double, increase by 50%
+ if (capacity > 1024L * 1024L * DOUBLING_THRESHOLD)
+ newSize = Math.max((capacity * 3L) / 2L, newSize);
+ else
+ newSize = Math.max(capacity * 2L, newSize);
+
+ return validateReallocation(newSize);
}
- protected void reallocate(long newSize)
+ protected void reallocate(long count)
{
- assert newSize <= Integer.MAX_VALUE;
- ByteBuffer newBuffer = ByteBuffer.allocate((int) newSize);
+ if (count <= 0)
+ return;
+ ByteBuffer newBuffer = ByteBuffer.allocate(checkedArraySizeCast(calculateNewSize(count)));
buffer.flip();
newBuffer.put(buffer);
buffer = newBuffer;
@@ -72,12 +135,13 @@ public class DataOutputBuffer extends BufferedDataOutputStreamPlus
return new GrowingChannel();
}
- private final class GrowingChannel implements WritableByteChannel
+ @VisibleForTesting
+ final class GrowingChannel implements WritableByteChannel
{
public int write(ByteBuffer src) throws IOException
{
int count = src.remaining();
- reallocate(Math.max((buffer.capacity() * 3) / 2, buffer.capacity() + count));
+ reallocate(count);
buffer.put(src);
return count;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a320737b/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java b/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
index fb8d671..c815c9e 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
@@ -47,7 +47,7 @@ public class DataOutputBufferFixed extends DataOutputBuffer
}
@Override
- protected void doFlush() throws IOException
+ protected void doFlush(int count) throws IOException
{
throw new BufferOverflowException();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a320737b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
index aad3266..24eb93c 100644
--- a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
@@ -42,8 +42,10 @@ public class SafeMemoryWriter extends DataOutputBuffer
return memory;
}
- protected void reallocate(long newCapacity)
+ @Override
+ protected void reallocate(long count)
{
+ long newCapacity = calculateNewSize(count);
if (newCapacity != capacity())
{
long position = length();
@@ -93,6 +95,12 @@ public class SafeMemoryWriter extends DataOutputBuffer
return this;
}
+ @Override
+ public long validateReallocation(long newSize)
+ {
+ return newSize;
+ }
+
private static long tailOffset(Memory memory)
{
return Math.max(0, memory.size - Integer.MAX_VALUE);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a320737b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index bbdf4e1..1fb5597 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -31,7 +31,10 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
+import java.util.ArrayDeque;
+import java.util.Deque;
import java.util.Random;
+import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import org.junit.Assert;
@@ -83,6 +86,18 @@ public class DataOutputTest
}
@Test
+ public void testDataOutputBufferZeroReallocate() throws IOException
+ {
+ try (DataOutputBufferSpy write = new DataOutputBufferSpy())
+ {
+ for (int ii = 0; ii < 1000000; ii++)
+ {
+ write.superReallocate(0);
+ }
+ }
+ }
+
+ @Test
public void testDataOutputDirectByteBuffer() throws IOException
{
ByteBuffer buf = wrap(new byte[345], true);
@@ -102,6 +117,193 @@ public class DataOutputTest
testRead(test, canon);
}
+ private static class DataOutputBufferSpy extends DataOutputBuffer
+ {
+ Deque<Long> sizes = new ArrayDeque<>();
+
+ DataOutputBufferSpy()
+ {
+ sizes.offer(128L);
+ }
+
+ void publicFlush() throws IOException
+ {
+ //Going to allow it to double instead of specifying a count
+ doFlush(1);
+ }
+
+ void superReallocate(int count) throws IOException
+ {
+ super.reallocate(count);
+ }
+
+ @Override
+ protected void reallocate(long count)
+ {
+ if (count <= 0)
+ return;
+ Long lastSize = sizes.peekLast();
+ long newSize = calculateNewSize(count);
+ sizes.offer(newSize);
+ if (newSize > DataOutputBuffer.MAX_ARRAY_SIZE)
+ throw new RuntimeException();
+ if (newSize < 0)
+ throw new AssertionError();
+ if (lastSize != null && newSize <= lastSize)
+ throw new AssertionError();
+ }
+
+ @Override
+ protected long capacity()
+ {
+ return sizes.peekLast().intValue();
+ }
+ }
+
+ //Check for overflow at the max size, without actually allocating all the memory
+ @Test
+ public void testDataOutputBufferMaxSizeFake() throws IOException
+ {
+ try (DataOutputBufferSpy write = new DataOutputBufferSpy())
+ {
+ boolean threw = false;
+ try
+ {
+ while (true)
+ write.publicFlush();
+ }
+ catch (RuntimeException e) {
+ if (e.getClass() == RuntimeException.class)
+ threw = true;
+ }
+ Assert.assertTrue(threw);
+ Assert.assertTrue(write.sizes.peekLast() >= DataOutputBuffer.MAX_ARRAY_SIZE);
+ }
+ }
+
+ @Test
+ public void testDataOutputBufferMaxSize() throws IOException
+ {
+ //Need a lot of heap to run this test for real.
+ //Tested everything else as much as possible since we can't do it all the time
+ if (Runtime.getRuntime().maxMemory() < 5033164800L)
+ return;
+
+ try (DataOutputBuffer write = new DataOutputBuffer())
+ {
+ //Doesn't throw up to DataOuptutBuffer.MAX_ARRAY_SIZE which is the array size limit in Java
+ for (int ii = 0; ii < DataOutputBuffer.MAX_ARRAY_SIZE / 8; ii++)
+ write.writeLong(0);
+ write.write(new byte[7]);
+
+ //Should fail due to validation
+ checkThrowsRuntimeException(validateReallocationCallable( write, DataOutputBuffer.MAX_ARRAY_SIZE + 1));
+ //Check that it does throw
+ checkThrowsRuntimeException(new Callable<Object>()
+ {
+ public Object call() throws Exception
+ {
+ write.write(42);
+ return null;
+ }
+ });
+ }
+ }
+
+ //Can't test it for real without tons of heap so test as much validation as possible
+ @Test
+ public void testDataOutputBufferBigReallocation() throws Exception
+ {
+ //Check saturating cast behavior
+ Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE, DataOutputBuffer.saturatedArraySizeCast(DataOutputBuffer.MAX_ARRAY_SIZE + 1L));
+ Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE, DataOutputBuffer.saturatedArraySizeCast(DataOutputBuffer.MAX_ARRAY_SIZE));
+ Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE - 1, DataOutputBuffer.saturatedArraySizeCast(DataOutputBuffer.MAX_ARRAY_SIZE - 1));
+ Assert.assertEquals(0, DataOutputBuffer.saturatedArraySizeCast(0));
+ Assert.assertEquals(1, DataOutputBuffer.saturatedArraySizeCast(1));
+ checkThrowsIAE(saturatedArraySizeCastCallable(-1));
+
+ //Check checked cast behavior
+ checkThrowsIAE(checkedArraySizeCastCallable(DataOutputBuffer.MAX_ARRAY_SIZE + 1L));
+ Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE, DataOutputBuffer.checkedArraySizeCast(DataOutputBuffer.MAX_ARRAY_SIZE));
+ Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE - 1, DataOutputBuffer.checkedArraySizeCast(DataOutputBuffer.MAX_ARRAY_SIZE - 1));
+ Assert.assertEquals(0, DataOutputBuffer.checkedArraySizeCast(0));
+ Assert.assertEquals(1, DataOutputBuffer.checkedArraySizeCast(1));
+ checkThrowsIAE(checkedArraySizeCastCallable(-1));
+
+
+ try (DataOutputBuffer write = new DataOutputBuffer())
+ {
+ //Checked validation performed by DOB
+ Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE, write.validateReallocation(DataOutputBuffer.MAX_ARRAY_SIZE + 1L));
+ Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE, write.validateReallocation(DataOutputBuffer.MAX_ARRAY_SIZE));
+ Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE - 1, write.validateReallocation(DataOutputBuffer.MAX_ARRAY_SIZE - 1));
+ checkThrowsRuntimeException(validateReallocationCallable( write, 0));
+ checkThrowsRuntimeException(validateReallocationCallable( write, 1));
+ checkThrowsIAE(validateReallocationCallable( write, -1));
+ }
+ }
+
+ Callable<Object> saturatedArraySizeCastCallable(final long value)
+ {
+ return new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ return DataOutputBuffer.saturatedArraySizeCast(value);
+ }
+ };
+ }
+
+ Callable<Object> checkedArraySizeCastCallable(final long value)
+ {
+ return new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ return DataOutputBuffer.checkedArraySizeCast(value);
+ }
+ };
+ }
+
+ Callable<Object> validateReallocationCallable(final DataOutputBuffer write, final long value)
+ {
+ return new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ return write.validateReallocation(value);
+ }
+ };
+ }
+
+ private static void checkThrowsIAE(Callable<Object> c)
+ {
+ checkThrowsException(c, IllegalArgumentException.class);
+ }
+
+ private static void checkThrowsRuntimeException(Callable<Object> c)
+ {
+ checkThrowsException(c, RuntimeException.class);
+ }
+
+ private static void checkThrowsException(Callable<Object> c, Class<?> exceptionClass)
+ {
+ boolean threw = false;
+ try
+ {
+ c.call();
+ }
+ catch (Throwable t)
+ {
+ if (t.getClass() == exceptionClass)
+ threw = true;
+ }
+ Assert.assertTrue(threw);
+ }
+
@Test
public void testSafeMemoryWriter() throws IOException
{