You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 16:03:14 UTC

flink git commit: [runtime] Slight improvements and tests in the Buffer class

Repository: flink
Updated Branches:
  refs/heads/master 798823597 -> 55db268f0


[runtime] Slight improvements and tests in the Buffer class


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/55db268f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/55db268f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/55db268f

Branch: refs/heads/master
Commit: 55db268f0869fdbbc38454aea351c838515d4a55
Parents: 7988235
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Mar 20 16:02:10 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 20 16:02:10 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/io/network/buffer/Buffer.java |  8 +++++---
 .../runtime/io/network/buffer/BufferTest.java   | 20 ++++++++++++++++++++
 2 files changed, 25 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/55db268f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
index 2642521..23c7ed0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
@@ -22,7 +22,6 @@ import org.apache.flink.core.memory.MemorySegment;
 
 import java.nio.ByteBuffer;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
@@ -86,6 +85,7 @@ public class Buffer {
 		synchronized (recycleLock) {
 			ensureNotRecycled();
 
+			// we need to return a copy here to guarantee thread-safety
 			return memorySegment.wrap(0, currentSize).duplicate();
 		}
 	}
@@ -104,8 +104,10 @@ public class Buffer {
 		synchronized (recycleLock) {
 			ensureNotRecycled();
 
-			checkArgument(newSize >= 0 && newSize <= memorySegment.size(), "Size of buffer must be >= 0 and <= " +
-					memorySegment.size() + ", but was " + newSize + ".");
+			if (newSize < 0 || newSize > memorySegment.size()) {
+				throw new IllegalArgumentException("Size of buffer must be >= 0 and <= " +
+													memorySegment.size() + ", but was " + newSize + ".");
+			}
 
 			currentSize = newSize;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/55db268f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
index f2f9c09..734dcfb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
@@ -23,6 +23,11 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
 public class BufferTest {
 
 	@Test
@@ -51,4 +56,19 @@ public class BufferTest {
 		}
 	}
 
+	@Test
+	public void testgetNioBufferThreadSafe() {
+		final MemorySegment segment = new MemorySegment(new byte[1024]);
+		final BufferRecycler recycler = Mockito.mock(BufferRecycler.class);
+
+		Buffer buffer = new Buffer(segment, recycler);
+
+		ByteBuffer buf1 = buffer.getNioBuffer();
+		ByteBuffer buf2 = buffer.getNioBuffer();
+
+		assertNotNull(buf1);
+		assertNotNull(buf2);
+
+		assertTrue("Repeated call to getNioBuffer() returns the same nio buffer", buf1 != buf2);
+	}
 }