You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/11/07 17:33:20 UTC

[flink] 03/04: [FLINK-12216][runtime] Respect the number of bytes from input parameters in HybridMemorySegment

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 534835b0386c767116708f7986dfb759e377dcaf
Author: liyafan82 <fa...@foxmail.com>
AuthorDate: Wed Apr 17 14:34:09 2019 +0800

    [FLINK-12216][runtime] Respect the number of bytes from input parameters in HybridMemorySegment
    
    This also needs to move the condition for "isReadOnly" to keep giving consistent error messages
    between all MemorySegment implementations.
    
    This closes #8194
---
 .../flink/core/memory/HybridMemorySegment.java     | 17 +++++------
 .../core/memory/HybridOnHeapMemorySegmentTest.java | 33 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 10 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
index c8889f5..3404bc1 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
@@ -296,6 +296,9 @@ public final class HybridMemorySegment extends MemorySegment {
 		if ((offset | numBytes | (offset + numBytes)) < 0) {
 			throw new IndexOutOfBoundsException();
 		}
+		if (target.isReadOnly()) {
+			throw new ReadOnlyBufferException();
+		}
 
 		final int targetOffset = target.position();
 		final int remaining = target.remaining();
@@ -305,10 +308,6 @@ public final class HybridMemorySegment extends MemorySegment {
 		}
 
 		if (target.isDirect()) {
-			if (target.isReadOnly()) {
-				throw new ReadOnlyBufferException();
-			}
-
 			// copy to the target memory directly
 			final long targetPointer = getAddress(target) + targetOffset;
 			final long sourcePointer = address + offset;
@@ -333,10 +332,8 @@ public final class HybridMemorySegment extends MemorySegment {
 			target.position(targetOffset + numBytes);
 		}
 		else {
-			// neither heap buffer nor direct buffer
-			while (target.hasRemaining()) {
-				target.put(get(offset++));
-			}
+			// other types of byte buffers
+			throw new IllegalArgumentException("The target buffer is not direct, and has no array.");
 		}
 	}
 
@@ -379,8 +376,8 @@ public final class HybridMemorySegment extends MemorySegment {
 			source.position(sourceOffset + numBytes);
 		}
 		else {
-			// neither heap buffer nor direct buffer
-			while (source.hasRemaining()) {
+			// other types of byte buffers
+			for (int i = 0; i < numBytes; i++) {
 				put(offset++, source.get());
 			}
 		}
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/HybridOnHeapMemorySegmentTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/HybridOnHeapMemorySegmentTest.java
index 7cfc301..9f725ee 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/HybridOnHeapMemorySegmentTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/HybridOnHeapMemorySegmentTest.java
@@ -77,4 +77,37 @@ public class HybridOnHeapMemorySegmentTest extends MemorySegmentTestBase {
 		assertEquals(3, buf2.position());
 		assertEquals(7, buf2.limit());
 	}
+
+	@Test
+	public void testReadOnlyByteBufferPut() {
+		final byte[] buffer = new byte[100];
+		HybridMemorySegment seg = new HybridMemorySegment(buffer, null);
+
+		String content = "hello world";
+		ByteBuffer bb = ByteBuffer.allocate(20);
+		bb.put(content.getBytes());
+		bb.rewind();
+
+		int offset = 10;
+		int numBytes = 5;
+
+		ByteBuffer readOnlyBuf = bb.asReadOnlyBuffer();
+		assertFalse(readOnlyBuf.isDirect());
+		assertFalse(readOnlyBuf.hasArray());
+
+		seg.put(offset, readOnlyBuf, numBytes);
+
+		// verify the area before the written region.
+		for (int i = 0; i < offset; i++) {
+			assertEquals(0, buffer[i]);
+		}
+
+		// verify the region that is written.
+		assertEquals("hello", new String(buffer, offset, numBytes));
+
+		// verify the area after the written region.
+		for (int i = offset + numBytes; i < buffer.length; i++) {
+			assertEquals(0, buffer[i]);
+		}
+	}
 }