You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/12/02 04:44:31 UTC

cassandra git commit: Fix integer overflow in DataOutputBuffer doubling and test as best as possible given that allocating 2 gigs in a unit test is problematic.

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 2491ede35 -> a320737b1


Fix integer overflow in DataOutputBuffer doubling and test as best as possible given that allocating 2 gigs in a unit test is problematic.

Patch by Ariel Weisberg; reviewed by tjake for CASSANDRA-10592


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a320737b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a320737b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a320737b

Branch: refs/heads/cassandra-2.2
Commit: a320737b18c19e3ec59035e5e487f2af1dcd0172
Parents: 2491ede
Author: Ariel Weisberg <ar...@datastax.com>
Authored: Tue Oct 27 12:19:14 2015 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Tue Dec 1 22:34:28 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../io/util/BufferedDataOutputStreamPlus.java   |  20 +-
 .../cassandra/io/util/DataOutputBuffer.java     |  78 ++++++-
 .../io/util/DataOutputBufferFixed.java          |   2 +-
 .../cassandra/io/util/SafeMemoryWriter.java     |  10 +-
 .../cassandra/io/util/DataOutputTest.java       | 202 +++++++++++++++++++
 6 files changed, 296 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a320737b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7541212..cf73f57 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,7 @@
  * Fix RangeNamesQueryPager (CASSANDRA-10509)
  * Deprecate Pig support (CASSANDRA-10542)
  * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
+ * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large buffers (CASSANDRA-10592)
 Merged from 2.1:
  * Add proper error handling to stream receiver (CASSANDRA-10774)
  * Warn or fail when changing cluster topology live (CASSANDRA-10243)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a320737b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
index 5669a8d..d55db47 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
@@ -118,7 +118,7 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
             }
             else
             {
-                doFlush();
+                doFlush(len - copied);
             }
         }
     }
@@ -142,11 +142,12 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
         else
         {
             assert toWrite.isDirect();
-            if (toWrite.remaining() > buffer.remaining())
+            int toWriteRemaining = toWrite.remaining();
+            if (toWriteRemaining > buffer.remaining())
             {
-                doFlush();
+                doFlush(toWriteRemaining);
                 MemoryUtil.duplicateDirectByteBuffer(toWrite, hollowBuffer);
-                if (toWrite.remaining() > buffer.remaining())
+                if (toWriteRemaining > buffer.remaining())
                 {
                     while (hollowBuffer.hasRemaining())
                         channel.write(hollowBuffer);
@@ -254,7 +255,10 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
             write(buffer);
     }
 
-    protected void doFlush() throws IOException
+    /*
+     * Count is the number of bytes remaining to write ignoring already remaining capacity
+     */
+    protected void doFlush(int count) throws IOException
     {
         buffer.flip();
 
@@ -267,13 +271,13 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
     @Override
     public void flush() throws IOException
     {
-        doFlush();
+        doFlush(0);
     }
 
     @Override
     public void close() throws IOException
     {
-        doFlush();
+        doFlush(0);
         channel.close();
         FileUtils.clean(buffer);
         buffer = null;
@@ -282,7 +286,7 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
     protected void ensureRemaining(int minimum) throws IOException
     {
         if (buffer.remaining() < minimum)
-            doFlush();
+            doFlush(minimum);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a320737b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
index 6ffc895..6ea6d97 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
@@ -21,6 +21,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 
+import org.apache.cassandra.config.Config;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
 /**
  * An implementation of the DataOutputStream interface using a FastByteArrayOutputStream and exposing
@@ -30,6 +34,11 @@ import java.nio.channels.WritableByteChannel;
  */
 public class DataOutputBuffer extends BufferedDataOutputStreamPlus
 {
+    /*
+     * Threshold at which resizing transitions from doubling to increasing by 50%
+     */
+    private static final long DOUBLING_THRESHOLD = Long.getLong(Config.PROPERTY_PREFIX + "DOB_DOUBLING_THRESHOLD_MB", 64);
+
     public DataOutputBuffer()
     {
         this(128);
@@ -51,16 +60,70 @@ public class DataOutputBuffer extends BufferedDataOutputStreamPlus
         throw new UnsupportedOperationException();
     }
 
+    //The actual value observed in Hotspot is only -2
+    //ByteArrayOutputStream uses -8
+    @VisibleForTesting
+    static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+    @VisibleForTesting
+    static int saturatedArraySizeCast(long size)
+    {
+        Preconditions.checkArgument(size >= 0);
+        return (int)Math.min(MAX_ARRAY_SIZE, size);
+    }
+
+    @VisibleForTesting
+    static int checkedArraySizeCast(long size)
+    {
+        Preconditions.checkArgument(size >= 0);
+        Preconditions.checkArgument(size <= MAX_ARRAY_SIZE);
+        return (int)size;
+    }
+
     @Override
-    protected void doFlush() throws IOException
+    protected void doFlush(int count) throws IOException
+    {
+        reallocate(count);
+    }
+
+    //Hack for test, make it possible to override checking the buffer capacity
+    @VisibleForTesting
+    long capacity()
+    {
+        return buffer.capacity();
+    }
+
+    @VisibleForTesting
+    long validateReallocation(long newSize)
+    {
+        int saturatedSize = saturatedArraySizeCast(newSize);
+        if (saturatedSize <= capacity())
+            throw new RuntimeException();
+        return saturatedSize;
+    }
+
+    @VisibleForTesting
+    long calculateNewSize(long count)
     {
-        reallocate(buffer.capacity() * 2);
+        long capacity = capacity();
+        //Both sides of this max expression need to use long arithmetic to avoid integer overflow
+        //count and capacity are longs so that ensures it right now.
+        long newSize = capacity + count;
+
+        //For large buffers don't double, increase by 50%
+        if (capacity > 1024L * 1024L * DOUBLING_THRESHOLD)
+            newSize = Math.max((capacity * 3L) / 2L, newSize);
+        else
+            newSize = Math.max(capacity * 2L, newSize);
+
+        return validateReallocation(newSize);
     }
 
-    protected void reallocate(long newSize)
+    protected void reallocate(long count)
     {
-        assert newSize <= Integer.MAX_VALUE;
-        ByteBuffer newBuffer = ByteBuffer.allocate((int) newSize);
+        if (count <= 0)
+            return;
+        ByteBuffer newBuffer = ByteBuffer.allocate(checkedArraySizeCast(calculateNewSize(count)));
         buffer.flip();
         newBuffer.put(buffer);
         buffer = newBuffer;
@@ -72,12 +135,13 @@ public class DataOutputBuffer extends BufferedDataOutputStreamPlus
         return new GrowingChannel();
     }
 
-    private final class GrowingChannel implements WritableByteChannel
+    @VisibleForTesting
+    final class GrowingChannel implements WritableByteChannel
     {
         public int write(ByteBuffer src) throws IOException
         {
             int count = src.remaining();
-            reallocate(Math.max((buffer.capacity() * 3) / 2, buffer.capacity() + count));
+            reallocate(count);
             buffer.put(src);
             return count;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a320737b/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java b/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
index fb8d671..c815c9e 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
@@ -47,7 +47,7 @@ public class DataOutputBufferFixed extends DataOutputBuffer
     }
 
     @Override
-    protected void doFlush() throws IOException
+    protected void doFlush(int count) throws IOException
     {
         throw new BufferOverflowException();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a320737b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
index aad3266..24eb93c 100644
--- a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
@@ -42,8 +42,10 @@ public class SafeMemoryWriter extends DataOutputBuffer
         return memory;
     }
 
-    protected void reallocate(long newCapacity)
+    @Override
+    protected void reallocate(long count)
     {
+        long newCapacity = calculateNewSize(count);
         if (newCapacity != capacity())
         {
             long position = length();
@@ -93,6 +95,12 @@ public class SafeMemoryWriter extends DataOutputBuffer
         return this;
     }
 
+    @Override
+    public long validateReallocation(long newSize)
+    {
+        return newSize;
+    }
+
     private static long tailOffset(Memory memory)
     {
         return Math.max(0, memory.size - Integer.MAX_VALUE);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a320737b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index bbdf4e1..1fb5597 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -31,7 +31,10 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
+import java.util.ArrayDeque;
+import java.util.Deque;
 import java.util.Random;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ThreadLocalRandom;
 
 import org.junit.Assert;
@@ -83,6 +86,18 @@ public class DataOutputTest
     }
 
     @Test
+    public void testDataOutputBufferZeroReallocate() throws IOException
+    {
+        try (DataOutputBufferSpy write = new DataOutputBufferSpy())
+        {
+            for (int ii = 0; ii < 1000000; ii++)
+            {
+                write.superReallocate(0);
+            }
+        }
+    }
+
+    @Test
     public void testDataOutputDirectByteBuffer() throws IOException
     {
         ByteBuffer buf = wrap(new byte[345], true);
@@ -102,6 +117,193 @@ public class DataOutputTest
         testRead(test, canon);
     }
 
+    private static class DataOutputBufferSpy extends DataOutputBuffer
+    {
+        Deque<Long> sizes = new ArrayDeque<>();
+
+        DataOutputBufferSpy()
+        {
+            sizes.offer(128L);
+        }
+
+        void publicFlush() throws IOException
+        {
+            //Going to allow it to double instead of specifying a count
+            doFlush(1);
+        }
+
+        void superReallocate(int count) throws IOException
+        {
+            super.reallocate(count);
+        }
+
+        @Override
+        protected void reallocate(long count)
+        {
+            if (count <= 0)
+                return;
+            Long lastSize = sizes.peekLast();
+            long newSize = calculateNewSize(count);
+            sizes.offer(newSize);
+            if (newSize > DataOutputBuffer.MAX_ARRAY_SIZE)
+                throw new RuntimeException();
+            if (newSize < 0)
+                throw new AssertionError();
+            if (lastSize != null && newSize <= lastSize)
+                throw new AssertionError();
+        }
+
+        @Override
+        protected long capacity()
+        {
+            return sizes.peekLast().intValue();
+        }
+    }
+
+    //Check for overflow at the max size, without actually allocating all the memory
+    @Test
+    public void testDataOutputBufferMaxSizeFake() throws IOException
+    {
+        try (DataOutputBufferSpy write = new DataOutputBufferSpy())
+        {
+            boolean threw = false;
+            try
+            {
+                while (true)
+                    write.publicFlush();
+            }
+            catch (RuntimeException e) {
+                if (e.getClass() == RuntimeException.class)
+                    threw = true;
+            }
+            Assert.assertTrue(threw);
+            Assert.assertTrue(write.sizes.peekLast() >= DataOutputBuffer.MAX_ARRAY_SIZE);
+        }
+    }
+
+    @Test
+    public void testDataOutputBufferMaxSize() throws IOException
+    {
+        //Need a lot of heap to run this test for real.
+        //Tested everything else as much as possible since we can't do it all the time
+        if (Runtime.getRuntime().maxMemory() < 5033164800L)
+            return;
+
+        try (DataOutputBuffer write = new DataOutputBuffer())
+        {
+            //Doesn't throw up to DataOuptutBuffer.MAX_ARRAY_SIZE which is the array size limit in Java
+            for (int ii = 0; ii < DataOutputBuffer.MAX_ARRAY_SIZE / 8; ii++)
+                write.writeLong(0);
+            write.write(new byte[7]);
+
+            //Should fail due to validation
+            checkThrowsRuntimeException(validateReallocationCallable( write, DataOutputBuffer.MAX_ARRAY_SIZE + 1));
+            //Check that it does throw
+            checkThrowsRuntimeException(new Callable<Object>()
+            {
+                public Object call() throws Exception
+                {
+                    write.write(42);
+                    return null;
+                }
+            });
+        }
+    }
+
+    //Can't test it for real without tons of heap so test as much validation as possible
+    @Test
+    public void testDataOutputBufferBigReallocation() throws Exception
+    {
+        //Check saturating cast behavior
+        Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE, DataOutputBuffer.saturatedArraySizeCast(DataOutputBuffer.MAX_ARRAY_SIZE + 1L));
+        Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE, DataOutputBuffer.saturatedArraySizeCast(DataOutputBuffer.MAX_ARRAY_SIZE));
+        Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE - 1, DataOutputBuffer.saturatedArraySizeCast(DataOutputBuffer.MAX_ARRAY_SIZE - 1));
+        Assert.assertEquals(0, DataOutputBuffer.saturatedArraySizeCast(0));
+        Assert.assertEquals(1, DataOutputBuffer.saturatedArraySizeCast(1));
+        checkThrowsIAE(saturatedArraySizeCastCallable(-1));
+
+        //Check checked cast behavior
+        checkThrowsIAE(checkedArraySizeCastCallable(DataOutputBuffer.MAX_ARRAY_SIZE + 1L));
+        Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE, DataOutputBuffer.checkedArraySizeCast(DataOutputBuffer.MAX_ARRAY_SIZE));
+        Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE - 1, DataOutputBuffer.checkedArraySizeCast(DataOutputBuffer.MAX_ARRAY_SIZE - 1));
+        Assert.assertEquals(0, DataOutputBuffer.checkedArraySizeCast(0));
+        Assert.assertEquals(1, DataOutputBuffer.checkedArraySizeCast(1));
+        checkThrowsIAE(checkedArraySizeCastCallable(-1));
+
+
+        try (DataOutputBuffer write = new DataOutputBuffer())
+        {
+            //Checked validation performed by DOB
+            Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE, write.validateReallocation(DataOutputBuffer.MAX_ARRAY_SIZE + 1L));
+            Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE, write.validateReallocation(DataOutputBuffer.MAX_ARRAY_SIZE));
+            Assert.assertEquals(DataOutputBuffer.MAX_ARRAY_SIZE - 1, write.validateReallocation(DataOutputBuffer.MAX_ARRAY_SIZE - 1));
+            checkThrowsRuntimeException(validateReallocationCallable( write, 0));
+            checkThrowsRuntimeException(validateReallocationCallable( write, 1));
+            checkThrowsIAE(validateReallocationCallable( write, -1));
+        }
+    }
+
+    Callable<Object> saturatedArraySizeCastCallable(final long value)
+    {
+        return new Callable<Object>()
+        {
+            @Override
+            public Object call() throws Exception
+            {
+                return DataOutputBuffer.saturatedArraySizeCast(value);
+            }
+        };
+    }
+
+    Callable<Object> checkedArraySizeCastCallable(final long value)
+    {
+        return new Callable<Object>()
+        {
+            @Override
+            public Object call() throws Exception
+            {
+                return DataOutputBuffer.checkedArraySizeCast(value);
+            }
+        };
+    }
+
+    Callable<Object> validateReallocationCallable(final DataOutputBuffer write, final long value)
+    {
+        return new Callable<Object>()
+        {
+            @Override
+            public Object call() throws Exception
+            {
+                return write.validateReallocation(value);
+            }
+        };
+    }
+
+    private static void checkThrowsIAE(Callable<Object> c)
+    {
+        checkThrowsException(c, IllegalArgumentException.class);
+    }
+
+    private static void checkThrowsRuntimeException(Callable<Object> c)
+    {
+        checkThrowsException(c, RuntimeException.class);
+    }
+
+    private static void checkThrowsException(Callable<Object> c, Class<?> exceptionClass)
+    {
+        boolean threw = false;
+        try
+        {
+            c.call();
+        }
+        catch (Throwable t)
+        {
+            if (t.getClass() == exceptionClass)
+                threw = true;
+        }
+        Assert.assertTrue(threw);
+    }
+
     @Test
     public void testSafeMemoryWriter() throws IOException
     {