You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/04 06:56:24 UTC

[flink] 02/02: [FLINK-11775][table-runtime-blink] Use MemorySegmentWritable to BinaryRowSerializer

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f54f7742841eb775256ae44f383e6db34a6363ee
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Tue Jun 18 15:49:06 2019 +0800

    [FLINK-11775][table-runtime-blink] Use MemorySegmentWritable to BinaryRowSerializer
    
    This closes #8775
---
 .../table/runtime/hashtable/LongHashPartition.java |  4 +--
 .../flink/table/typeutils/BinaryRowSerializer.java | 38 +++++++++++-----------
 2 files changed, 21 insertions(+), 21 deletions(-)

diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java
index fa39a84..7d54945 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java
@@ -622,7 +622,7 @@ public class LongHashPartition extends AbstractPagedInputView implements Seekabl
 			if (row.getSegments().length == 1) {
 				buildSideWriteBuffer.write(row.getSegments()[0], row.getOffset(), sizeInBytes);
 			} else {
-				buildSideSerializer.serializeToPagesWithoutLength(row, buildSideWriteBuffer);
+				buildSideSerializer.serializeWithoutLengthSlow(row, buildSideWriteBuffer);
 			}
 		} else {
 			serializeToPages(row);
@@ -642,7 +642,7 @@ public class LongHashPartition extends AbstractPagedInputView implements Seekabl
 		if (row.getSegments().length == 1) {
 			buildSideWriteBuffer.write(row.getSegments()[0], row.getOffset(), sizeInBytes);
 		} else {
-			buildSideSerializer.serializeToPagesWithoutLength(row, buildSideWriteBuffer);
+			buildSideSerializer.serializeWithoutLengthSlow(row, buildSideWriteBuffer);
 		}
 	}
 
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryRowSerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryRowSerializer.java
index 0851021..9804767 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryRowSerializer.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryRowSerializer.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.core.memory.MemorySegmentWritable;
 import org.apache.flink.runtime.memory.AbstractPagedInputView;
 import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 import org.apache.flink.table.dataformat.BinaryRow;
@@ -83,12 +84,13 @@ public class BinaryRowSerializer extends AbstractRowSerializer<BinaryRow> {
 	@Override
 	public void serialize(BinaryRow record, DataOutputView target) throws IOException {
 		target.writeInt(record.getSizeInBytes());
-		SegmentsUtil.copyToView(
-				record.getSegments(),
-				record.getOffset(),
-				record.getSizeInBytes(),
-				target
-		);
+		if (target instanceof MemorySegmentWritable) {
+			serializeWithoutLength(record, (MemorySegmentWritable) target);
+		} else {
+			SegmentsUtil.copyToView(
+					record.getSegments(), record.getOffset(),
+					record.getSizeInBytes(), target);
+		}
 	}
 
 	@Override
@@ -133,25 +135,23 @@ public class BinaryRowSerializer extends AbstractRowSerializer<BinaryRow> {
 			BinaryRow record,
 			AbstractPagedOutputView headerLessView) throws IOException {
 		checkArgument(headerLessView.getHeaderLength() == 0);
-		int sizeInBytes = record.getSizeInBytes();
 		int skip = checkSkipWriteForFixLengthPart(headerLessView);
+		headerLessView.writeInt(record.getSizeInBytes());
+		serializeWithoutLength(record, headerLessView);
+		return skip;
+	}
+
+	private static void serializeWithoutLength(
+			BinaryRow record, MemorySegmentWritable writable) throws IOException {
 		if (record.getSegments().length == 1) {
-			headerLessView.writeInt(sizeInBytes);
-			headerLessView.write(record.getSegments()[0], record.getOffset(), sizeInBytes);
+			writable.write(record.getSegments()[0], record.getOffset(), record.getSizeInBytes());
 		} else {
-			headerLessView.writeInt(record.getSizeInBytes());
-			serializeToPagesWithoutLength(record, headerLessView);
+			serializeWithoutLengthSlow(record, writable);
 		}
-		return skip;
 	}
 
-	/**
-	 * Serialize row to pages without row length. The caller should make sure that the fixed-length
-	 * parit can fit in output's current segment, no skip check will be done here.
-	 */
-	public void serializeToPagesWithoutLength(
-			BinaryRow record,
-			AbstractPagedOutputView out) throws IOException {
+	public static void serializeWithoutLengthSlow(
+			BinaryRow record, MemorySegmentWritable out) throws IOException {
 		int remainSize = record.getSizeInBytes();
 		int posInSegOfRecord = record.getOffset();
 		int segmentSize = record.getSegments()[0].size();