You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/23 02:10:53 UTC
[flink] 01/03: [FLINK-13304][table-runtime-blink] Fix
implementation of getString and getBinary method in NestedRow
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b62e22c40ea49936981ad3c8ccd0a2564bc88d86
Author: TsReaper <ts...@gmail.com>
AuthorDate: Thu Jul 18 15:52:55 2019 +0800
[FLINK-13304][table-runtime-blink] Fix implementation of getString and getBinary method in NestedRow
This closes #9154
---
.../apache/flink/table/dataformat/NestedRow.java | 12 +-
.../flink/table/dataformat/DataFormatTestUtil.java | 73 ++++++++++
.../flink/table/dataformat/NestedRowTest.java | 162 +++++++++++++++++++++
3 files changed, 243 insertions(+), 4 deletions(-)
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
index 1ba5592..36fcf44 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
@@ -25,8 +25,12 @@ import static org.apache.flink.table.dataformat.BinaryRow.calculateBitSetWidthIn
import static org.apache.flink.util.Preconditions.checkArgument;
/**
- * Its memory storage structure and {@link BinaryRow} exactly the same, the only different is it supports
- * all bytes in variable MemorySegments.
+ * Its memory storage structure is exactly the same with {@link BinaryRow}.
+ * The only different is that, as {@link NestedRow} is used
+ * to store row value in the variable-length part of {@link BinaryRow},
+ * every field (including both fixed-length part and variable-length part) of {@link NestedRow}
+ * has a possibility to cross the boundary of a segment, while the fixed-length part of {@link BinaryRow}
+ * must fit into its first memory segment.
*/
public final class NestedRow extends BinaryFormat implements BaseRow {
@@ -219,7 +223,7 @@ public final class NestedRow extends BinaryFormat implements BaseRow {
public BinaryString getString(int pos) {
assertIndexIsValid(pos);
int fieldOffset = getFieldOffset(pos);
- final long offsetAndLen = segments[0].getLong(fieldOffset);
+ final long offsetAndLen = SegmentsUtil.getLong(segments, fieldOffset);
return BinaryString.readBinaryStringFieldFromSegments(segments, offset, fieldOffset, offsetAndLen);
}
@@ -247,7 +251,7 @@ public final class NestedRow extends BinaryFormat implements BaseRow {
public byte[] getBinary(int pos) {
assertIndexIsValid(pos);
int fieldOffset = getFieldOffset(pos);
- final long offsetAndLen = segments[0].getLong(fieldOffset);
+ final long offsetAndLen = SegmentsUtil.getLong(segments, fieldOffset);
return readBinaryFieldFromSegments(segments, offset, fieldOffset, offsetAndLen);
}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java
new file mode 100644
index 0000000..d89fa5b
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+/**
+ * Utils for testing data formats.
+ */
+class DataFormatTestUtil {
+
+ /**
+ * Split the given byte array into two memory segments.
+ */
+ static MemorySegment[] splitBytes(byte[] bytes, int baseOffset) {
+ int newSize = (bytes.length + 1) / 2 + baseOffset;
+ MemorySegment[] ret = new MemorySegment[2];
+ ret[0] = MemorySegmentFactory.wrap(new byte[newSize]);
+ ret[1] = MemorySegmentFactory.wrap(new byte[newSize]);
+
+ ret[0].put(baseOffset, bytes, 0, newSize - baseOffset);
+ ret[1].put(0, bytes, newSize - baseOffset, bytes.length - (newSize - baseOffset));
+ return ret;
+ }
+
+ /**
+ * A simple class for testing generic type getting / setting on data formats.
+ */
+ static class MyObj {
+ public int i;
+ public double j;
+
+ MyObj(int i, double j) {
+ this.i = i;
+ this.j = j;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ MyObj myObj = (MyObj) o;
+
+ return i == myObj.i && Double.compare(myObj.j, j) == 0;
+ }
+
+ @Override
+ public String toString() {
+ return "MyObj{" + "i=" + i + ", j=" + j + '}';
+ }
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/NestedRowTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/NestedRowTest.java
new file mode 100644
index 0000000..78a6ba0
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/NestedRowTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.typeutils.BaseRowSerializer;
+
+import org.junit.Test;
+
+import static org.apache.flink.table.dataformat.DataFormatTestUtil.MyObj;
+import static org.apache.flink.table.dataformat.DataFormatTestUtil.splitBytes;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for {@link NestedRow}s.
+ */
+public class NestedRowTest {
+
+ @Test
+ public void testNestedRowWithOneSegment() {
+ BinaryRow row = getBinaryRow();
+ GenericTypeInfo<MyObj> info = new GenericTypeInfo<>(MyObj.class);
+ TypeSerializer<MyObj> genericSerializer = info.createSerializer(new ExecutionConfig());
+
+ BaseRow nestedRow = row.getRow(0, 5);
+ assertEquals(nestedRow.getInt(0), 1);
+ assertEquals(nestedRow.getLong(1), 5L);
+ assertEquals(nestedRow.getString(2), BinaryString.fromString("12345678"));
+ assertTrue(nestedRow.isNullAt(3));
+ assertEquals(new MyObj(15, 5),
+ BinaryGeneric.getJavaObjectFromBinaryGeneric(nestedRow.getGeneric(4), genericSerializer));
+ }
+
+ @Test
+ public void testNestedRowWithMultipleSegments() {
+ BinaryRow row = getBinaryRow();
+ GenericTypeInfo<MyObj> info = new GenericTypeInfo<>(MyObj.class);
+ TypeSerializer<MyObj> genericSerializer = info.createSerializer(new ExecutionConfig());
+
+ MemorySegment[] segments = splitBytes(row.getSegments()[0].getHeapMemory(), 3);
+ row.pointTo(segments, 3, row.getSizeInBytes());
+ {
+ BaseRow nestedRow = row.getRow(0, 5);
+ assertEquals(nestedRow.getInt(0), 1);
+ assertEquals(nestedRow.getLong(1), 5L);
+ assertEquals(nestedRow.getString(2), BinaryString.fromString("12345678"));
+ assertTrue(nestedRow.isNullAt(3));
+ assertEquals(new MyObj(15, 5),
+ BinaryGeneric.getJavaObjectFromBinaryGeneric(nestedRow.getGeneric(4), genericSerializer));
+ }
+ }
+
+ @Test
+ public void testNestInNestedRow() {
+ // layer1
+ GenericRow gRow = new GenericRow(4);
+ gRow.setField(0, 1);
+ gRow.setField(1, 5L);
+ gRow.setField(2, BinaryString.fromString("12345678"));
+ gRow.setField(3, null);
+
+ // layer2
+ BaseRowSerializer serializer = new BaseRowSerializer(
+ new LogicalType[]{
+ DataTypes.INT().getLogicalType(),
+ DataTypes.BIGINT().getLogicalType(),
+ DataTypes.STRING().getLogicalType(),
+ DataTypes.STRING().getLogicalType()
+ },
+ new TypeSerializer[]{
+ IntSerializer.INSTANCE,
+ LongSerializer.INSTANCE,
+ StringSerializer.INSTANCE,
+ StringSerializer.INSTANCE
+ });
+ BinaryRow row = new BinaryRow(2);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ writer.writeString(0, BinaryString.fromString("hahahahafff"));
+ writer.writeRow(1, gRow, serializer);
+ writer.complete();
+
+ // layer3
+ BinaryRow row2 = new BinaryRow(1);
+ BinaryRowWriter writer2 = new BinaryRowWriter(row2);
+ writer2.writeRow(0, row, null);
+ writer2.complete();
+
+ // verify
+ {
+ NestedRow nestedRow = (NestedRow) row2.getRow(0, 2);
+ BinaryRow binaryRow = new BinaryRow(2);
+ binaryRow.pointTo(nestedRow.getSegments(), nestedRow.getOffset(),
+ nestedRow.getSizeInBytes());
+ assertEquals(binaryRow, row);
+ }
+
+ assertEquals(row2.getRow(0, 2).getString(0), BinaryString.fromString("hahahahafff"));
+ BaseRow nestedRow = row2.getRow(0, 2).getRow(1, 4);
+ assertEquals(nestedRow.getInt(0), 1);
+ assertEquals(nestedRow.getLong(1), 5L);
+ assertEquals(nestedRow.getString(2), BinaryString.fromString("12345678"));
+ assertTrue(nestedRow.isNullAt(3));
+ }
+
+ private BinaryRow getBinaryRow() {
+ BinaryRow row = new BinaryRow(1);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+
+ GenericTypeInfo<MyObj> info = new GenericTypeInfo<>(MyObj.class);
+ TypeSerializer<MyObj> genericSerializer = info.createSerializer(new ExecutionConfig());
+ GenericRow gRow = new GenericRow(5);
+ gRow.setField(0, 1);
+ gRow.setField(1, 5L);
+ gRow.setField(2, BinaryString.fromString("12345678"));
+ gRow.setField(3, null);
+ gRow.setField(4, new BinaryGeneric<>(new MyObj(15, 5), genericSerializer));
+
+ BaseRowSerializer serializer = new BaseRowSerializer(
+ new LogicalType[]{
+ DataTypes.INT().getLogicalType(),
+ DataTypes.BIGINT().getLogicalType(),
+ DataTypes.STRING().getLogicalType(),
+ DataTypes.STRING().getLogicalType(),
+ DataTypes.ANY(info).getLogicalType()
+ },
+ new TypeSerializer[]{
+ IntSerializer.INSTANCE,
+ LongSerializer.INSTANCE,
+ StringSerializer.INSTANCE,
+ StringSerializer.INSTANCE,
+ genericSerializer
+ });
+ writer.writeRow(0, gRow, serializer);
+ writer.complete();
+
+ return row;
+ }
+}