You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/15 07:16:04 UTC

[GitHub] [flink] godfreyhe commented on a change in pull request #12103: [FLINK-16998][core] Add a changeflag to Row

godfreyhe commented on a change in pull request #12103:
URL: https://github.com/apache/flink/pull/12103#discussion_r425606156



##########
File path: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java
##########
@@ -57,8 +62,8 @@
 	private final boolean invertNormKey;
 
 	// null masks for serialized comparison

Review comment:
       also update the comment ?

##########
File path: flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
##########
@@ -286,6 +286,21 @@ public String toString() {
 		return bld.toString();
 	}
 
+	/**
+	 * Creates a serializer for the old {@link Row} format before Flink 1.11.
+	 *
+	 * <p>The serialization format has changed from 1.10 to 1.11 and added {@link Row#getKind()}.
+	 */
+	@Deprecated
+	public TypeSerializer<Row> createLegacySerializer(ExecutionConfig config) {

Review comment:
       is this prepare for FLINK-17520 ? I notice that this method is only used for testing now 

##########
File path: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
##########
@@ -28,35 +28,56 @@
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.util.Arrays;
+import java.util.Objects;
 
-import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoAndCopyNullMask;
-import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
-import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.writeNullMask;
+import static org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoAndCopyMask;
+import static org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask;
+import static org.apache.flink.api.java.typeutils.runtime.MaskUtils.writeMask;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Serializer for {@link Row}.
+ *
+ * <p>It uses the following serialization format:
+ * <pre>
+ *     |bitmask|field|field|....
+ * </pre>
+ * The bitmask consist of {@link #ROW_KIND_OFFSET} bits for encoding the {@link RowKind}
+ * and n bits for whether a field is null.
  */
 @Internal
 public final class RowSerializer extends TypeSerializer<Row> {
 
-	private static final long serialVersionUID = 1L;
+	public static final int ROW_KIND_OFFSET = 2;
+
+	private static final long serialVersionUID = 2L;
+
+	private final boolean legacyModeEnabled;
+
+	private final int legacyOffset;
 
 	private final TypeSerializer<Object>[] fieldSerializers;
 
 	private final int arity;
 
-	private transient boolean[] nullMask;
+	private final boolean[] mask;
+

Review comment:
       nit: redundant empty lines

##########
File path: flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/LegacyRowSerializerTest.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.io.Serializable;
+
+/**
+ * Tests for the old serialization format of {@link Row} before Flink 1.11.
+ */
+public class LegacyRowSerializerTest {
+
+	@Test
+	public void testRowSerializer() {
+		RowTypeInfo typeInfo = new RowTypeInfo(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO);
+		Row row1 = new Row(2);
+		row1.setField(0, 1);
+		row1.setField(1, "a");
+
+		Row row2 = new Row(2);
+		row2.setField(0, 2);
+		row2.setField(1, null);
+
+		TypeSerializer<Row> serializer = typeInfo.createLegacySerializer(new ExecutionConfig());
+		RowSerializerTestInstance instance = new RowSerializerTestInstance(serializer, row1, row2);
+		instance.testAll();
+	}
+
+	@Test
+	public void testLargeRowSerializer() {
+		RowTypeInfo typeInfo = new RowTypeInfo(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO);
+
+		Row row = new Row(13);
+		row.setField(0, 2);
+		row.setField(1, null);
+		row.setField(3, null);
+		row.setField(4, null);
+		row.setField(5, null);
+		row.setField(6, null);
+		row.setField(7, null);
+		row.setField(8, null);
+		row.setField(9, null);
+		row.setField(10, null);
+		row.setField(11, null);
+		row.setField(12, "Test");
+
+		TypeSerializer<Row> serializer = typeInfo.createLegacySerializer(new ExecutionConfig());
+		RowSerializerTestInstance testInstance = new RowSerializerTestInstance(serializer, row);
+		testInstance.testAll();
+	}
+
+	@Test
+	public void testRowSerializerWithComplexTypes() {
+		RowTypeInfo typeInfo = new RowTypeInfo(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.DOUBLE_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO,
+			new TupleTypeInfo<Tuple3<Integer, Boolean, Short>>(
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.BOOLEAN_TYPE_INFO,
+				BasicTypeInfo.SHORT_TYPE_INFO),
+			TypeExtractor.createTypeInfo(MyPojo.class));
+
+		MyPojo testPojo1 = new MyPojo();
+		testPojo1.name = null;
+		MyPojo testPojo2 = new MyPojo();
+		testPojo2.name = "Test1";
+		MyPojo testPojo3 = new MyPojo();
+		testPojo3.name = "Test2";
+
+		Row[] data = new Row[]{
+			createRow(null, null, null, null, null),
+			createRow(0, null, null, null, null),
+			createRow(0, 0.0, null, null, null),
+			createRow(0, 0.0, "a", null, null),
+			createRow(1, 0.0, "a", null, null),
+			createRow(1, 1.0, "a", null, null),
+			createRow(1, 1.0, "b", null, null),
+			createRow(1, 1.0, "b", new Tuple3<>(1, false, (short) 2), null),
+			createRow(1, 1.0, "b", new Tuple3<>(2, false, (short) 2), null),
+			createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 2), null),
+			createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), null),
+			createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo1),
+			createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo2),
+			createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo3)
+		};
+
+		TypeSerializer<Row> serializer = typeInfo.createLegacySerializer(new ExecutionConfig());
+		RowSerializerTestInstance testInstance = new RowSerializerTestInstance(serializer, data);
+		testInstance.testAll();
+	}
+
+	// ----------------------------------------------------------------------------------------------
+
+	private static Row createRow(Object f0, Object f1, Object f2, Object f3, Object f4) {
+		Row row = new Row(5);
+		row.setField(0, f0);
+		row.setField(1, f1);
+		row.setField(2, f2);
+		row.setField(3, f3);
+		row.setField(4, f4);
+		return row;
+	}
+
+	private class RowSerializerTestInstance extends SerializerTestInstance<Row> {
+
+		RowSerializerTestInstance(
+			TypeSerializer<Row> serializer,
+			Row... testData) {
+			super(serializer, Row.class, -1, testData);
+		}
+	}
+
+	public static class MyPojo implements Serializable, Comparable<MyPojo> {
+		public String name = null;
+
+		@Override
+		public int compareTo(MyPojo o) {
+			if (name == null && o.name == null) {
+				return 0;
+			} else if (name == null) {
+				return -1;
+			} else if (o.name == null) {
+				return 1;
+			} else {
+				return name.compareTo(o.name);
+			}
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+
+			MyPojo myPojo = (MyPojo) o;
+
+			return name != null ? name.equals(myPojo.name) : myPojo.name == null;

Review comment:
       can be simplified as `Objects.equals(name, myPojo.name)`

##########
File path: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
##########
@@ -28,35 +28,56 @@
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.util.Arrays;
+import java.util.Objects;
 
-import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoAndCopyNullMask;
-import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
-import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.writeNullMask;
+import static org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoAndCopyMask;
+import static org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask;
+import static org.apache.flink.api.java.typeutils.runtime.MaskUtils.writeMask;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Serializer for {@link Row}.
+ *
+ * <p>It uses the following serialization format:
+ * <pre>
+ *     |bitmask|field|field|....
+ * </pre>
+ * The bitmask consist of {@link #ROW_KIND_OFFSET} bits for encoding the {@link RowKind}
+ * and n bits for whether a field is null.

Review comment:
       also add a format for `bitmask` for `legacyModeModed` is true and false ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org