You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/23 02:10:53 UTC

[flink] 01/03: [FLINK-13304][table-runtime-blink] Fix implementation of getString and getBinary method in NestedRow

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b62e22c40ea49936981ad3c8ccd0a2564bc88d86
Author: TsReaper <ts...@gmail.com>
AuthorDate: Thu Jul 18 15:52:55 2019 +0800

    [FLINK-13304][table-runtime-blink] Fix implementation of getString and getBinary method in NestedRow
    
    This closes #9154
---
 .../apache/flink/table/dataformat/NestedRow.java   |  12 +-
 .../flink/table/dataformat/DataFormatTestUtil.java |  73 ++++++++++
 .../flink/table/dataformat/NestedRowTest.java      | 162 +++++++++++++++++++++
 3 files changed, 243 insertions(+), 4 deletions(-)

diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
index 1ba5592..36fcf44 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
@@ -25,8 +25,12 @@ import static org.apache.flink.table.dataformat.BinaryRow.calculateBitSetWidthIn
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
- * Its memory storage structure and {@link BinaryRow} exactly the same, the only different is it supports
- * all bytes in variable MemorySegments.
+ * Its memory storage structure is exactly the same with {@link BinaryRow}.
+ * The only different is that, as {@link NestedRow} is used
+ * to store row value in the variable-length part of {@link BinaryRow},
+ * every field (including both fixed-length part and variable-length part) of {@link NestedRow}
+ * has a possibility to cross the boundary of a segment, while the fixed-length part of {@link BinaryRow}
+ * must fit into its first memory segment.
  */
 public final class NestedRow extends BinaryFormat implements BaseRow {
 
@@ -219,7 +223,7 @@ public final class NestedRow extends BinaryFormat implements BaseRow {
 	public BinaryString getString(int pos) {
 		assertIndexIsValid(pos);
 		int fieldOffset = getFieldOffset(pos);
-		final long offsetAndLen = segments[0].getLong(fieldOffset);
+		final long offsetAndLen = SegmentsUtil.getLong(segments, fieldOffset);
 		return BinaryString.readBinaryStringFieldFromSegments(segments, offset, fieldOffset, offsetAndLen);
 	}
 
@@ -247,7 +251,7 @@ public final class NestedRow extends BinaryFormat implements BaseRow {
 	public byte[] getBinary(int pos) {
 		assertIndexIsValid(pos);
 		int fieldOffset = getFieldOffset(pos);
-		final long offsetAndLen = segments[0].getLong(fieldOffset);
+		final long offsetAndLen = SegmentsUtil.getLong(segments, fieldOffset);
 		return readBinaryFieldFromSegments(segments, offset, fieldOffset, offsetAndLen);
 	}
 
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java
new file mode 100644
index 0000000..d89fa5b
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.	See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.	You may obtain a copy of the License at
+ *
+ *		http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+/**
+ * Utils for testing data formats.
+ */
+class DataFormatTestUtil {
+
+	/**
+	 * Split the given byte array into two memory segments.
+	 */
+	static MemorySegment[] splitBytes(byte[] bytes, int baseOffset) {
+		int newSize = (bytes.length + 1) / 2 + baseOffset;
+		MemorySegment[] ret = new MemorySegment[2];
+		ret[0] = MemorySegmentFactory.wrap(new byte[newSize]);
+		ret[1] = MemorySegmentFactory.wrap(new byte[newSize]);
+
+		ret[0].put(baseOffset, bytes, 0, newSize - baseOffset);
+		ret[1].put(0, bytes, newSize - baseOffset, bytes.length - (newSize - baseOffset));
+		return ret;
+	}
+
+	/**
+	 * A simple class for testing generic type getting / setting on data formats.
+	 */
+	static class MyObj {
+		public int i;
+		public double j;
+
+		MyObj(int i, double j) {
+			this.i = i;
+			this.j = j;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+
+			MyObj myObj = (MyObj) o;
+
+			return i == myObj.i && Double.compare(myObj.j, j) == 0;
+		}
+
+		@Override
+		public String toString() {
+			return "MyObj{" + "i=" + i + ", j=" + j + '}';
+		}
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/NestedRowTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/NestedRowTest.java
new file mode 100644
index 0000000..78a6ba0
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/NestedRowTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.	See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.	You may obtain a copy of the License at
+ *
+ *		http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.typeutils.BaseRowSerializer;
+
+import org.junit.Test;
+
+import static org.apache.flink.table.dataformat.DataFormatTestUtil.MyObj;
+import static org.apache.flink.table.dataformat.DataFormatTestUtil.splitBytes;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for {@link NestedRow}s.
+ */
+public class NestedRowTest {
+
+	@Test
+	public void testNestedRowWithOneSegment() {
+		BinaryRow row = getBinaryRow();
+		GenericTypeInfo<MyObj> info = new GenericTypeInfo<>(MyObj.class);
+		TypeSerializer<MyObj> genericSerializer = info.createSerializer(new ExecutionConfig());
+
+		BaseRow nestedRow = row.getRow(0, 5);
+		assertEquals(nestedRow.getInt(0), 1);
+		assertEquals(nestedRow.getLong(1), 5L);
+		assertEquals(nestedRow.getString(2), BinaryString.fromString("12345678"));
+		assertTrue(nestedRow.isNullAt(3));
+		assertEquals(new MyObj(15, 5),
+			BinaryGeneric.getJavaObjectFromBinaryGeneric(nestedRow.getGeneric(4), genericSerializer));
+	}
+
+	@Test
+	public void testNestedRowWithMultipleSegments() {
+		BinaryRow row = getBinaryRow();
+		GenericTypeInfo<MyObj> info = new GenericTypeInfo<>(MyObj.class);
+		TypeSerializer<MyObj> genericSerializer = info.createSerializer(new ExecutionConfig());
+
+		MemorySegment[] segments = splitBytes(row.getSegments()[0].getHeapMemory(), 3);
+		row.pointTo(segments, 3, row.getSizeInBytes());
+		{
+			BaseRow nestedRow = row.getRow(0, 5);
+			assertEquals(nestedRow.getInt(0), 1);
+			assertEquals(nestedRow.getLong(1), 5L);
+			assertEquals(nestedRow.getString(2), BinaryString.fromString("12345678"));
+			assertTrue(nestedRow.isNullAt(3));
+			assertEquals(new MyObj(15, 5),
+				BinaryGeneric.getJavaObjectFromBinaryGeneric(nestedRow.getGeneric(4), genericSerializer));
+		}
+	}
+
+	@Test
+	public void testNestInNestedRow() {
+		// layer1
+		GenericRow gRow = new GenericRow(4);
+		gRow.setField(0, 1);
+		gRow.setField(1, 5L);
+		gRow.setField(2, BinaryString.fromString("12345678"));
+		gRow.setField(3, null);
+
+		// layer2
+		BaseRowSerializer serializer = new BaseRowSerializer(
+			new LogicalType[]{
+				DataTypes.INT().getLogicalType(),
+				DataTypes.BIGINT().getLogicalType(),
+				DataTypes.STRING().getLogicalType(),
+				DataTypes.STRING().getLogicalType()
+			},
+			new TypeSerializer[]{
+				IntSerializer.INSTANCE,
+				LongSerializer.INSTANCE,
+				StringSerializer.INSTANCE,
+				StringSerializer.INSTANCE
+			});
+		BinaryRow row = new BinaryRow(2);
+		BinaryRowWriter writer = new BinaryRowWriter(row);
+		writer.writeString(0, BinaryString.fromString("hahahahafff"));
+		writer.writeRow(1, gRow, serializer);
+		writer.complete();
+
+		// layer3
+		BinaryRow row2 = new BinaryRow(1);
+		BinaryRowWriter writer2 = new BinaryRowWriter(row2);
+		writer2.writeRow(0, row, null);
+		writer2.complete();
+
+		// verify
+		{
+			NestedRow nestedRow = (NestedRow) row2.getRow(0, 2);
+			BinaryRow binaryRow = new BinaryRow(2);
+			binaryRow.pointTo(nestedRow.getSegments(), nestedRow.getOffset(),
+				nestedRow.getSizeInBytes());
+			assertEquals(binaryRow, row);
+		}
+
+		assertEquals(row2.getRow(0, 2).getString(0), BinaryString.fromString("hahahahafff"));
+		BaseRow nestedRow = row2.getRow(0, 2).getRow(1, 4);
+		assertEquals(nestedRow.getInt(0), 1);
+		assertEquals(nestedRow.getLong(1), 5L);
+		assertEquals(nestedRow.getString(2), BinaryString.fromString("12345678"));
+		assertTrue(nestedRow.isNullAt(3));
+	}
+
+	private BinaryRow getBinaryRow() {
+		BinaryRow row = new BinaryRow(1);
+		BinaryRowWriter writer = new BinaryRowWriter(row);
+
+		GenericTypeInfo<MyObj> info = new GenericTypeInfo<>(MyObj.class);
+		TypeSerializer<MyObj> genericSerializer = info.createSerializer(new ExecutionConfig());
+		GenericRow gRow = new GenericRow(5);
+		gRow.setField(0, 1);
+		gRow.setField(1, 5L);
+		gRow.setField(2, BinaryString.fromString("12345678"));
+		gRow.setField(3, null);
+		gRow.setField(4, new BinaryGeneric<>(new MyObj(15, 5), genericSerializer));
+
+		BaseRowSerializer serializer = new BaseRowSerializer(
+			new LogicalType[]{
+				DataTypes.INT().getLogicalType(),
+				DataTypes.BIGINT().getLogicalType(),
+				DataTypes.STRING().getLogicalType(),
+				DataTypes.STRING().getLogicalType(),
+				DataTypes.ANY(info).getLogicalType()
+			},
+			new TypeSerializer[]{
+				IntSerializer.INSTANCE,
+				LongSerializer.INSTANCE,
+				StringSerializer.INSTANCE,
+				StringSerializer.INSTANCE,
+				genericSerializer
+			});
+		writer.writeRow(0, gRow, serializer);
+		writer.complete();
+
+		return row;
+	}
+}