You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/04 06:56:24 UTC
[flink] 02/02: [FLINK-11775][table-runtime-blink] Use
MemorySegmentWritable to BinaryRowSerializer
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f54f7742841eb775256ae44f383e6db34a6363ee
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Tue Jun 18 15:49:06 2019 +0800
[FLINK-11775][table-runtime-blink] Use MemorySegmentWritable to BinaryRowSerializer
This closes #8775
---
.../table/runtime/hashtable/LongHashPartition.java | 4 +--
.../flink/table/typeutils/BinaryRowSerializer.java | 38 +++++++++++-----------
2 files changed, 21 insertions(+), 21 deletions(-)
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java
index fa39a84..7d54945 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java
@@ -622,7 +622,7 @@ public class LongHashPartition extends AbstractPagedInputView implements Seekabl
if (row.getSegments().length == 1) {
buildSideWriteBuffer.write(row.getSegments()[0], row.getOffset(), sizeInBytes);
} else {
- buildSideSerializer.serializeToPagesWithoutLength(row, buildSideWriteBuffer);
+ buildSideSerializer.serializeWithoutLengthSlow(row, buildSideWriteBuffer);
}
} else {
serializeToPages(row);
@@ -642,7 +642,7 @@ public class LongHashPartition extends AbstractPagedInputView implements Seekabl
if (row.getSegments().length == 1) {
buildSideWriteBuffer.write(row.getSegments()[0], row.getOffset(), sizeInBytes);
} else {
- buildSideSerializer.serializeToPagesWithoutLength(row, buildSideWriteBuffer);
+ buildSideSerializer.serializeWithoutLengthSlow(row, buildSideWriteBuffer);
}
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryRowSerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryRowSerializer.java
index 0851021..9804767 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryRowSerializer.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryRowSerializer.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.core.memory.MemorySegmentWritable;
import org.apache.flink.runtime.memory.AbstractPagedInputView;
import org.apache.flink.runtime.memory.AbstractPagedOutputView;
import org.apache.flink.table.dataformat.BinaryRow;
@@ -83,12 +84,13 @@ public class BinaryRowSerializer extends AbstractRowSerializer<BinaryRow> {
@Override
public void serialize(BinaryRow record, DataOutputView target) throws IOException {
target.writeInt(record.getSizeInBytes());
- SegmentsUtil.copyToView(
- record.getSegments(),
- record.getOffset(),
- record.getSizeInBytes(),
- target
- );
+ if (target instanceof MemorySegmentWritable) {
+ serializeWithoutLength(record, (MemorySegmentWritable) target);
+ } else {
+ SegmentsUtil.copyToView(
+ record.getSegments(), record.getOffset(),
+ record.getSizeInBytes(), target);
+ }
}
@Override
@@ -133,25 +135,23 @@ public class BinaryRowSerializer extends AbstractRowSerializer<BinaryRow> {
BinaryRow record,
AbstractPagedOutputView headerLessView) throws IOException {
checkArgument(headerLessView.getHeaderLength() == 0);
- int sizeInBytes = record.getSizeInBytes();
int skip = checkSkipWriteForFixLengthPart(headerLessView);
+ headerLessView.writeInt(record.getSizeInBytes());
+ serializeWithoutLength(record, headerLessView);
+ return skip;
+ }
+
+ private static void serializeWithoutLength(
+ BinaryRow record, MemorySegmentWritable writable) throws IOException {
if (record.getSegments().length == 1) {
- headerLessView.writeInt(sizeInBytes);
- headerLessView.write(record.getSegments()[0], record.getOffset(), sizeInBytes);
+ writable.write(record.getSegments()[0], record.getOffset(), record.getSizeInBytes());
} else {
- headerLessView.writeInt(record.getSizeInBytes());
- serializeToPagesWithoutLength(record, headerLessView);
+ serializeWithoutLengthSlow(record, writable);
}
- return skip;
}
- /**
- * Serialize row to pages without row length. The caller should make sure that the fixed-length
- * parit can fit in output's current segment, no skip check will be done here.
- */
- public void serializeToPagesWithoutLength(
- BinaryRow record,
- AbstractPagedOutputView out) throws IOException {
+ public static void serializeWithoutLengthSlow(
+ BinaryRow record, MemorySegmentWritable out) throws IOException {
int remainSize = record.getSizeInBytes();
int posInSegOfRecord = record.getOffset();
int segmentSize = record.getSegments()[0].size();