You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/15 10:49:54 UTC
[4/7] flink git commit: [FLINK-5187] [core] Port Row and related type
utils to Java and move them to flink-core.
[FLINK-5187] [core] Port Row and related type utils to Java and move them to flink-core.
This closes #2968.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86f8a255
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86f8a255
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86f8a255
Branch: refs/heads/master
Commit: 86f8a255db6ce2ff9e09c2824e85c4930427ecdb
Parents: 15e7f0a
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Thu Dec 8 22:44:29 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Dec 15 11:36:40 2016 +0100
----------------------------------------------------------------------
docs/dev/types_serialization.md | 6 +-
.../flink/api/java/typeutils/RowTypeInfo.java | 203 ++++++++
.../typeutils/runtime/NullAwareComparator.java | 240 +++++++++
.../java/typeutils/runtime/NullMaskUtils.java | 105 ++++
.../java/typeutils/runtime/RowComparator.java | 488 +++++++++++++++++++
.../java/typeutils/runtime/RowSerializer.java | 243 +++++++++
.../main/java/org/apache/flink/types/Row.java | 116 +++++
.../api/java/typeutils/RowTypeInfoTest.java | 69 +++
.../typeutils/runtime/RowComparatorTest.java | 156 ++++++
.../RowComparatorWithManyFieldsTests.java | 103 ++++
.../typeutils/runtime/RowSerializerTest.java | 197 ++++++++
.../java/org/apache/flink/types/RowTest.java | 37 ++
12 files changed, 1961 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/docs/dev/types_serialization.md
----------------------------------------------------------------------
diff --git a/docs/dev/types_serialization.md b/docs/dev/types_serialization.md
index ea02df0..2b43563 100644
--- a/docs/dev/types_serialization.md
+++ b/docs/dev/types_serialization.md
@@ -87,9 +87,11 @@ Internally, Flink makes the following distinctions between types:
* Composite types
- * Flink Java Tuples (part of the Flink Java API)
+ * Flink Java Tuples (part of the Flink Java API): max 25 fields, null fields not supported
- * Scala *case classes* (including Scala tuples)
+ * Scala *case classes* (including Scala tuples): max 22 fields, null fields not supported
+
+ * Row: tuples with arbitrary number of fields and support for null fields
* POJOs: classes that follow a certain bean-like pattern
http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
new file mode 100644
index 0000000..03cbe61
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.RowComparator;
+import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * TypeInformation for {@link Row}
+ */
+@PublicEvolving
+public class RowTypeInfo extends TupleTypeInfoBase<Row> {
+
+ private static final long serialVersionUID = 9158518989896601963L;
+
+ protected final String[] fieldNames;
+ /** Temporary variable for directly passing orders to comparators. */
+ private boolean[] comparatorOrders = null;
+
+ public RowTypeInfo(TypeInformation<?>... types) {
+ super(Row.class, types);
+
+ this.fieldNames = new String[types.length];
+
+ for (int i = 0; i < types.length; i++) {
+ fieldNames[i] = "f" + i;
+ }
+ }
+
+ @Override
+ public TypeComparator<Row> createComparator(
+ int[] logicalKeyFields,
+ boolean[] orders,
+ int logicalFieldOffset,
+ ExecutionConfig config) {
+
+ comparatorOrders = orders;
+ TypeComparator<Row> comparator = super.createComparator(
+ logicalKeyFields,
+ orders,
+ logicalFieldOffset,
+ config);
+ comparatorOrders = null;
+ return comparator;
+ }
+
+ @Override
+ protected TypeComparatorBuilder<Row> createTypeComparatorBuilder() {
+ if (comparatorOrders == null) {
+ throw new IllegalStateException("Cannot create comparator builder without orders.");
+ }
+ return new RowTypeComparatorBuilder(comparatorOrders);
+ }
+
+ @Override
+ public String[] getFieldNames() {
+ return fieldNames;
+ }
+
+ @Override
+ public int getFieldIndex(String fieldName) {
+ for (int i = 0; i < fieldNames.length; i++) {
+ if (fieldNames[i].equals(fieldName)) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ @Override
+ public TypeSerializer<Row> createSerializer(ExecutionConfig config) {
+ int len = getArity();
+ TypeSerializer<?>[] fieldSerializers = new TypeSerializer[len];
+ for (int i = 0; i < len; i++) {
+ fieldSerializers[i] = types[i].createSerializer(config);
+ }
+ return new RowSerializer(fieldSerializers);
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof RowTypeInfo;
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * super.hashCode() + Arrays.hashCode(fieldNames);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder bld = new StringBuilder("Row");
+ if (types.length > 0) {
+ bld.append('(').append(fieldNames[0]).append(": ").append(types[0]);
+
+ for (int i = 1; i < types.length; i++) {
+ bld.append(", ").append(fieldNames[i]).append(": ").append(types[i]);
+ }
+
+ bld.append(')');
+ }
+ return bld.toString();
+ }
+
+ private class RowTypeComparatorBuilder implements TypeComparatorBuilder<Row> {
+
+ private final ArrayList<TypeComparator> fieldComparators = new ArrayList<TypeComparator>();
+ private final ArrayList<Integer> logicalKeyFields = new ArrayList<Integer>();
+ private final boolean[] comparatorOrders;
+
+ public RowTypeComparatorBuilder(boolean[] comparatorOrders) {
+ this.comparatorOrders = comparatorOrders;
+ }
+
+ @Override
+ public void initializeTypeComparatorBuilder(int size) {
+ fieldComparators.ensureCapacity(size);
+ logicalKeyFields.ensureCapacity(size);
+ }
+
+ @Override
+ public void addComparatorField(int fieldId, TypeComparator<?> comparator) {
+ fieldComparators.add(comparator);
+ logicalKeyFields.add(fieldId);
+ }
+
+ @Override
+ public TypeComparator<Row> createTypeComparator(ExecutionConfig config) {
+ checkState(
+ fieldComparators.size() > 0,
+ "No field comparators were defined for the TupleTypeComparatorBuilder."
+ );
+
+ checkState(
+ logicalKeyFields.size() > 0,
+ "No key fields were defined for the TupleTypeComparatorBuilder."
+ );
+
+ checkState(
+ fieldComparators.size() == logicalKeyFields.size(),
+ "The number of field comparators and key fields is not equal."
+ );
+
+ final int maxKey = Collections.max(logicalKeyFields);
+
+ checkState(
+ maxKey >= 0,
+ "The maximum key field must be greater or equal than 0."
+ );
+
+ TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[maxKey + 1];
+
+ for (int i = 0; i <= maxKey; i++) {
+ fieldSerializers[i] = types[i].createSerializer(config);
+ }
+
+ int[] keyPositions = new int[logicalKeyFields.size()];
+ for (int i = 0; i < keyPositions.length; i++) {
+ keyPositions[i] = logicalKeyFields.get(i);
+ }
+
+ TypeComparator[] comparators = new TypeComparator[fieldComparators.size()];
+ for (int i = 0; i < fieldComparators.size(); i++) {
+ comparators[i] = fieldComparators.get(i);
+ }
+
+ //noinspection unchecked
+ return new RowComparator(
+ getArity(),
+ keyPositions,
+ comparators,
+ (TypeSerializer<Object>[]) fieldSerializers,
+ comparatorOrders);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullAwareComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullAwareComparator.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullAwareComparator.java
new file mode 100644
index 0000000..3587811
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullAwareComparator.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Null-aware comparator that wraps a comparator which does not support null references.
+ * <p>
+ * NOTE: This class assumes to be used within a composite type comparator (such
+ * as {@link RowComparator}) that handles serialized comparison.
+ */
+public class NullAwareComparator<T> extends TypeComparator<T> {
+ private static final long serialVersionUID = 1L;
+
+ private final TypeComparator<T> wrappedComparator;
+ private final boolean order;
+
+ // number of flat fields
+ private final int flatFields;
+
+ // stores the null for reference comparison
+ private boolean nullReference = false;
+
+ public NullAwareComparator(TypeComparator<T> wrappedComparator, boolean order) {
+ this.wrappedComparator = wrappedComparator;
+ this.order = order;
+ this.flatFields = wrappedComparator.getFlatComparators().length;
+ }
+
+ @Override
+ public int hash(T record) {
+ if (record != null) {
+ return wrappedComparator.hash(record);
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
+ public void setReference(T toCompare) {
+ if (toCompare == null) {
+ nullReference = true;
+ } else {
+ nullReference = false;
+ wrappedComparator.setReference(toCompare);
+ }
+ }
+
+ @Override
+ public boolean equalToReference(T candidate) {
+ // both values are null
+ if (candidate == null && nullReference) {
+ return true;
+ }
+ // one value is null
+ else if (candidate == null || nullReference) {
+ return false;
+ }
+ // no null value
+ else {
+ return wrappedComparator.equalToReference(candidate);
+ }
+ }
+
+ @Override
+ public int compareToReference(TypeComparator<T> referencedComparator) {
+ NullAwareComparator otherComparator = (NullAwareComparator) referencedComparator;
+ boolean otherNullReference = otherComparator.nullReference;
+ // both values are null -> equality
+ if (nullReference && otherNullReference) {
+ return 0;
+ }
+ // first value is null -> inequality
+ // but order is considered
+ else if (nullReference) {
+ return order ? 1 : -1;
+ }
+ // second value is null -> inequality
+ // but order is considered
+ else if (otherNullReference) {
+ return order ? -1 : 1;
+ }
+ // no null values
+ else {
+ return wrappedComparator.compareToReference(otherComparator.wrappedComparator);
+ }
+ }
+
+ @Override
+ public int compare(T first, T second) {
+ // both values are null -> equality
+ if (first == null && second == null) {
+ return 0;
+ }
+ // first value is null -> inequality
+ // but order is considered
+ else if (first == null) {
+ return order ? -1 : 1;
+ }
+ // second value is null -> inequality
+ // but order is considered
+ else if (second == null) {
+ return order ? 1 : -1;
+ }
+ // no null values
+ else {
+ return wrappedComparator.compare(first, second);
+ }
+ }
+
+ @Override
+ public int compareSerialized(
+ DataInputView firstSource,
+ DataInputView secondSource) throws IOException {
+
+ throw new UnsupportedOperationException(
+ "Comparator does not support null-aware serialized comparision.");
+ }
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return wrappedComparator.supportsNormalizedKey();
+ }
+
+ @Override
+ public boolean supportsSerializationWithKeyNormalization() {
+ return false;
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ int len = wrappedComparator.getNormalizeKeyLen();
+ if (len == Integer.MAX_VALUE) {
+ return Integer.MAX_VALUE;
+ } else {
+ // add one for a null byte
+ return len + 1;
+ }
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return wrappedComparator.isNormalizedKeyPrefixOnly(keyBytes - 1);
+ }
+
+ @Override
+ public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
+ if (numBytes > 0) {
+ // write a null byte with padding
+ if (record == null) {
+ target.putBoolean(offset, false);
+ // write padding
+ for (int j = 0; j < numBytes - 1; j++) {
+ target.put(offset + 1 + j, (byte) 0);
+ }
+ }
+ // write a non-null byte with key
+ else {
+ target.putBoolean(offset, true);
+ // write key
+ wrappedComparator.putNormalizedKey(record, target, offset + 1, numBytes - 1);
+ }
+ }
+ }
+
+ @Override
+ public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
+ throw new UnsupportedOperationException(
+ "Record serialization with leading normalized keys not supported.");
+ }
+
+ @Override
+ public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
+ throw new UnsupportedOperationException(
+ "Record deserialization with leading normalized keys not supported.");
+ }
+
+ @Override
+ public boolean invertNormalizedKey() {
+ return wrappedComparator.invertNormalizedKey();
+ }
+
+ @Override
+ public TypeComparator<T> duplicate() {
+ return new NullAwareComparator<T>(wrappedComparator.duplicate(), order);
+ }
+
+ @Override
+ public int extractKeys(Object record, Object[] target, int index) {
+ if (record == null) {
+ for (int i = 0; i < flatFields; i++) {
+ target[index + i] = null;
+ }
+ return flatFields;
+ } else {
+ return wrappedComparator.extractKeys(record, target, index);
+ }
+ }
+
+ @Override
+ public TypeComparator[] getFlatComparators() {
+ // determine the flat comparators and wrap them again in null-aware comparators
+ List<TypeComparator<?>> flatComparators = new ArrayList<>();
+ if (wrappedComparator instanceof CompositeTypeComparator) {
+ ((CompositeTypeComparator) wrappedComparator).getFlatComparator(flatComparators);
+ } else {
+ flatComparators.add(wrappedComparator);
+ }
+
+ TypeComparator<?>[] result = new TypeComparator[flatComparators.size()];
+ for (int i = 0; i < result.length; i++) {
+ result[i] = new NullAwareComparator<>(flatComparators.get(i), order);
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullMaskUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullMaskUtils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullMaskUtils.java
new file mode 100644
index 0000000..010af7f
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullMaskUtils.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+
+public class NullMaskUtils {
+
+ public static void writeNullMask(int len, Row value, DataOutputView target) throws IOException {
+ int b = 0x00;
+ int bytePos = 0;
+
+ int fieldPos = 0;
+ int numPos = 0;
+ while (fieldPos < len) {
+ b = 0x00;
+ // set bits in byte
+ bytePos = 0;
+ numPos = Math.min(8, len - fieldPos);
+ while (bytePos < numPos) {
+ b = b << 1;
+ // set bit if field is null
+ if (value.getField(fieldPos + bytePos) == null) {
+ b |= 0x01;
+ }
+ bytePos += 1;
+ }
+ fieldPos += numPos;
+ // shift bits if last byte is not completely filled
+ b <<= (8 - bytePos);
+ // write byte
+ target.writeByte(b);
+ }
+ }
+
+ public static void readIntoNullMask(
+ int len,
+ DataInputView source,
+ boolean[] nullMask) throws IOException {
+
+ int b = 0x00;
+ int bytePos = 0;
+
+ int fieldPos = 0;
+ int numPos = 0;
+ while (fieldPos < len) {
+ // read byte
+ b = source.readUnsignedByte();
+ bytePos = 0;
+ numPos = Math.min(8, len - fieldPos);
+ while (bytePos < numPos) {
+ nullMask[fieldPos + bytePos] = (b & 0x80) > 0;
+ b = b << 1;
+ bytePos += 1;
+ }
+ fieldPos += numPos;
+ }
+ }
+
+ public static void readIntoAndCopyNullMask(
+ int len,
+ DataInputView source,
+ DataOutputView target,
+ boolean[] nullMask) throws IOException {
+
+ int b = 0x00;
+ int bytePos = 0;
+
+ int fieldPos = 0;
+ int numPos = 0;
+ while (fieldPos < len) {
+ // read byte
+ b = source.readUnsignedByte();
+ // copy byte
+ target.writeByte(b);
+ bytePos = 0;
+ numPos = Math.min(8, len - fieldPos);
+ while (bytePos < numPos) {
+ nullMask[fieldPos + bytePos] = (b & 0x80) > 0;
+ b = b << 1;
+ bytePos += 1;
+ }
+ fieldPos += numPos;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java
new file mode 100644
index 0000000..d6c5195
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.KeyFieldOutOfBoundsException;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Comparator for {@link Row}
+ */
+public class RowComparator extends CompositeTypeComparator<Row> {
+
+ private static final long serialVersionUID = 1L;
+ /** The number of fields of the Row */
+ private final int arity;
+ /** key positions describe which fields are keys in what order */
+ private final int[] keyPositions;
+ /** null-aware comparators for the key fields, in the same order as the key fields */
+ private final NullAwareComparator<Object>[] comparators;
+ /** serializers to deserialize the first n fields for comparison */
+ private final TypeSerializer<Object>[] serializers;
+ /** auxiliary fields for normalized key support */
+ private final int[] normalizedKeyLengths;
+ private final int numLeadingNormalizableKeys;
+ private final int normalizableKeyPrefixLen;
+ private final boolean invertNormKey;
+
+ // null masks for serialized comparison
+ private final boolean[] nullMask1;
+ private final boolean[] nullMask2;
+
+ // cache for the deserialized key field objects
+ transient private final Object[] deserializedKeyFields1;
+ transient private final Object[] deserializedKeyFields2;
+
+ /**
+ * General constructor for RowComparator.
+ *
+ * @param arity the number of fields of the Row
+ * @param keyPositions key positions describe which fields are keys in what order
+ * @param comparators non-null-aware comparators for the key fields, in the same order as
+ * the key fields
+ * @param serializers serializers to deserialize the first n fields for comparison
+ * @param orders sorting orders for the fields
+ */
+ public RowComparator(
+ int arity,
+ int[] keyPositions,
+ TypeComparator<Object>[] comparators,
+ TypeSerializer<Object>[] serializers,
+ boolean[] orders) {
+
+ this(arity, keyPositions, makeNullAware(comparators, orders), serializers);
+ }
+
+
+ /**
+ * Intermediate constructor for creating auxiliary fields.
+ */
+ private RowComparator(
+ int arity,
+ int[] keyPositions,
+ NullAwareComparator<Object>[] comparators,
+ TypeSerializer<Object>[] serializers) {
+
+ this(
+ arity,
+ keyPositions,
+ comparators,
+ serializers,
+ createAuxiliaryFields(keyPositions, comparators));
+ }
+
+ /**
+ * Intermediate constructor for creating auxiliary fields.
+ */
+ private RowComparator(
+ int arity,
+ int[] keyPositions,
+ NullAwareComparator<Object>[] comparators,
+ TypeSerializer<Object>[] serializers,
+ Tuple4<int[], Integer, Integer, Boolean> auxiliaryFields) {
+
+ this(
+ arity,
+ keyPositions,
+ comparators,
+ serializers,
+ auxiliaryFields.f0,
+ auxiliaryFields.f1,
+ auxiliaryFields.f2,
+ auxiliaryFields.f3);
+ }
+
+ /**
+ * Intermediate constructor for creating auxiliary fields.
+ */
+ private RowComparator(
+ int arity,
+ int[] keyPositions,
+ NullAwareComparator<Object>[] comparators,
+ TypeSerializer<Object>[] serializers,
+ int[] normalizedKeyLengths,
+ int numLeadingNormalizableKeys,
+ int normalizableKeyPrefixLen,
+ boolean invertNormKey) {
+
+ this.arity = arity;
+ this.keyPositions = keyPositions;
+ this.comparators = comparators;
+ this.serializers = serializers;
+ this.normalizedKeyLengths = normalizedKeyLengths;
+ this.numLeadingNormalizableKeys = numLeadingNormalizableKeys;
+ this.normalizableKeyPrefixLen = normalizableKeyPrefixLen;
+ this.invertNormKey = invertNormKey;
+ this.nullMask1 = new boolean[arity];
+ this.nullMask2 = new boolean[arity];
+ deserializedKeyFields1 = instantiateDeserializationFields();
+ deserializedKeyFields2 = instantiateDeserializationFields();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Comparator Methods
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void getFlatComparator(List<TypeComparator> flatComparators) {
+ for (NullAwareComparator<Object> c : comparators) {
+ Collections.addAll(flatComparators, c.getFlatComparators());
+ }
+ }
+
+ @Override
+ public int hash(Row record) {
+ int code = 0;
+ int i = 0;
+
+ try {
+ for (; i < keyPositions.length; i++) {
+ code *= TupleComparatorBase.HASH_SALT[i & 0x1F];
+ Object element = record.getField(keyPositions[i]); // element can be null
+ code += comparators[i].hash(element);
+ }
+ } catch (IndexOutOfBoundsException e) {
+ throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+ }
+
+ return code;
+ }
+
+ @Override
+ public void setReference(Row toCompare) {
+ int i = 0;
+ try {
+ for (; i < keyPositions.length; i++) {
+ TypeComparator<Object> comparator = comparators[i];
+ Object element = toCompare.getField(keyPositions[i]);
+ comparator.setReference(element); // element can be null
+ }
+ } catch (IndexOutOfBoundsException e) {
+ throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+ }
+ }
+
+ @Override
+ public boolean equalToReference(Row candidate) {
+ int i = 0;
+ try {
+ for (; i < keyPositions.length; i++) {
+ TypeComparator<Object> comparator = comparators[i];
+ Object element = candidate.getField(keyPositions[i]); // element can be null
+ // check if reference is not equal
+ if (!comparator.equalToReference(element)) {
+ return false;
+ }
+ }
+ } catch (IndexOutOfBoundsException e) {
+ throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+ }
+ return true;
+ }
+
+ @Override
+ public int compareToReference(TypeComparator<Row> referencedComparator) {
+ RowComparator other = (RowComparator) referencedComparator;
+ int i = 0;
+ try {
+ for (; i < keyPositions.length; i++) {
+ int cmp = comparators[i].compareToReference(other.comparators[i]);
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+ } catch (IndexOutOfBoundsException e) {
+ throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+ }
+ return 0;
+ }
+
+ @Override
+ public int compare(Row first, Row second) {
+ int i = 0;
+ try {
+ for (; i < keyPositions.length; i++) {
+ int keyPos = keyPositions[i];
+ TypeComparator<Object> comparator = comparators[i];
+ Object firstElement = first.getField(keyPos); // element can be null
+ Object secondElement = second.getField(keyPos); // element can be null
+
+ int cmp = comparator.compare(firstElement, secondElement);
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+ } catch (IndexOutOfBoundsException e) {
+ throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+ }
+ return 0;
+ }
+
+ @Override
+ public int compareSerialized(
+ DataInputView firstSource,
+ DataInputView secondSource) throws IOException {
+
+ int len = serializers.length;
+ int keyLen = keyPositions.length;
+
+ readIntoNullMask(arity, firstSource, nullMask1);
+ readIntoNullMask(arity, secondSource, nullMask2);
+
+ // deserialize
+ for (int i = 0; i < len; i++) {
+ TypeSerializer<Object> serializer = serializers[i];
+
+ // deserialize field 1
+ if (!nullMask1[i]) {
+ deserializedKeyFields1[i] = serializer.deserialize(
+ deserializedKeyFields1[i],
+ firstSource);
+ }
+
+ // deserialize field 2
+ if (!nullMask2[i]) {
+ deserializedKeyFields2[i] = serializer.deserialize(
+ deserializedKeyFields2[i],
+ secondSource);
+ }
+ }
+
+ // compare
+ for (int i = 0; i < keyLen; i++) {
+ int keyPos = keyPositions[i];
+ TypeComparator<Object> comparator = comparators[i];
+
+ boolean isNull1 = nullMask1[keyPos];
+ boolean isNull2 = nullMask2[keyPos];
+
+ int cmp = 0;
+ // both values are null -> equality
+ if (isNull1 && isNull2) {
+ cmp = 0;
+ }
+ // first value is null -> inequality
+ else if (isNull1) {
+ cmp = comparator.compare(null, deserializedKeyFields2[keyPos]);
+ }
+ // second value is null -> inequality
+ else if (isNull2) {
+ cmp = comparator.compare(deserializedKeyFields1[keyPos], null);
+ }
+ // no null values
+ else {
+ cmp = comparator.compare(
+ deserializedKeyFields1[keyPos],
+ deserializedKeyFields2[keyPos]);
+ }
+
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+
+ return 0;
+ }
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return numLeadingNormalizableKeys > 0;
+ }
+
+ @Override
+ public boolean supportsSerializationWithKeyNormalization() {
+ return false;
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ return normalizableKeyPrefixLen;
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return numLeadingNormalizableKeys < keyPositions.length ||
+ normalizableKeyPrefixLen == Integer.MAX_VALUE ||
+ normalizableKeyPrefixLen > keyBytes;
+ }
+
+ @Override
+ public void putNormalizedKey(Row record, MemorySegment target, int offset, int numBytes) {
+ int bytesLeft = numBytes;
+ int currentOffset = offset;
+
+ for (int i = 0; i < numLeadingNormalizableKeys && bytesLeft > 0; i++) {
+ int len = normalizedKeyLengths[i];
+ len = bytesLeft >= len ? len : bytesLeft;
+
+ TypeComparator<Object> comparator = comparators[i];
+ Object element = record.getField(keyPositions[i]); // element can be null
+ // write key
+ comparator.putNormalizedKey(element, target, currentOffset, len);
+
+ bytesLeft -= len;
+ currentOffset += len;
+ }
+
+ }
+
+ @Override
+ public void writeWithKeyNormalization(Row record, DataOutputView target) throws IOException {
+ throw new UnsupportedOperationException(
+ "Record serialization with leading normalized keys not supported.");
+ }
+
+ @Override
+ public Row readWithKeyDenormalization(Row reuse, DataInputView source) throws IOException {
+ throw new UnsupportedOperationException(
+ "Record deserialization with leading normalized keys not supported.");
+ }
+
+ @Override
+ public boolean invertNormalizedKey() {
+ return invertNormKey;
+ }
+
+ @Override
+ public TypeComparator<Row> duplicate() {
+ NullAwareComparator<?>[] comparatorsCopy = new NullAwareComparator<?>[comparators.length];
+ for (int i = 0; i < comparators.length; i++) {
+ comparatorsCopy[i] = (NullAwareComparator<?>) comparators[i].duplicate();
+ }
+
+ TypeSerializer<?>[] serializersCopy = new TypeSerializer<?>[serializers.length];
+ for (int i = 0; i < serializers.length; i++) {
+ serializersCopy[i] = serializers[i].duplicate();
+ }
+
+ return new RowComparator(
+ arity,
+ keyPositions,
+ (NullAwareComparator<Object>[]) comparatorsCopy,
+ (TypeSerializer<Object>[]) serializersCopy,
+ normalizedKeyLengths,
+ numLeadingNormalizableKeys,
+ normalizableKeyPrefixLen,
+ invertNormKey);
+ }
+
+ @Override
+ public int extractKeys(Object record, Object[] target, int index) {
+ int len = comparators.length;
+ int localIndex = index;
+ for (int i = 0; i < len; i++) {
+ Object element = ((Row) record).getField(keyPositions[i]); // element can be null
+ localIndex += comparators[i].extractKeys(element, target, localIndex);
+ }
+ return localIndex - index;
+ }
+
+
+ private Object[] instantiateDeserializationFields() {
+ Object[] newFields = new Object[serializers.length];
+ for (int i = 0; i < serializers.length; i++) {
+ newFields[i] = serializers[i].createInstance();
+ }
+ return newFields;
+ }
+
+ /**
+ * @return creates auxiliary fields for normalized key support
+ */
+ private static Tuple4<int[], Integer, Integer, Boolean>
+ createAuxiliaryFields(int[] keyPositions, NullAwareComparator<Object>[] comparators) {
+
+ int[] normalizedKeyLengths = new int[keyPositions.length];
+ int numLeadingNormalizableKeys = 0;
+ int normalizableKeyPrefixLen = 0;
+ boolean inverted = false;
+
+ for (int i = 0; i < keyPositions.length; i++) {
+ NullAwareComparator<Object> k = comparators[i];
+ // as long as the leading keys support normalized keys, we can build up the composite key
+ if (k.supportsNormalizedKey()) {
+ if (i == 0) {
+ // the first comparator decides whether we need to invert the key direction
+ inverted = k.invertNormalizedKey();
+ } else if (k.invertNormalizedKey() != inverted) {
+ // if a successor does not agree on the inversion direction, it cannot be part of the
+ // normalized key
+ return new Tuple4<>(
+ normalizedKeyLengths,
+ numLeadingNormalizableKeys,
+ normalizableKeyPrefixLen,
+ inverted);
+ }
+ numLeadingNormalizableKeys++;
+ int len = k.getNormalizeKeyLen();
+ if (len < 0) {
+ throw new RuntimeException(
+ "Comparator " + k.getClass().getName() +
+ " specifies an invalid length for the normalized key: " + len);
+ }
+ normalizedKeyLengths[i] = len;
+ normalizableKeyPrefixLen += len;
+ if (normalizableKeyPrefixLen < 0) {
+ // overflow, which means we are out of budget for normalized key space anyways
+ return new Tuple4<>(
+ normalizedKeyLengths,
+ numLeadingNormalizableKeys,
+ Integer.MAX_VALUE,
+ inverted);
+ }
+ } else {
+ return new Tuple4<>(
+ normalizedKeyLengths,
+ numLeadingNormalizableKeys,
+ normalizableKeyPrefixLen,
+ inverted);
+ }
+ }
+ return new Tuple4<>(
+ normalizedKeyLengths,
+ numLeadingNormalizableKeys,
+ normalizableKeyPrefixLen,
+ inverted);
+ }
+
+ private static NullAwareComparator<Object>[] makeNullAware(
+ TypeComparator<Object>[] comparators,
+ boolean[] orders) {
+
+ checkArgument(comparators.length == orders.length);
+ NullAwareComparator<?>[] result = new NullAwareComparator<?>[comparators.length];
+ for (int i = 0; i < comparators.length; i++) {
+ result[i] = new NullAwareComparator<Object>(comparators[i], orders[i]);
+ }
+ return (NullAwareComparator<Object>[]) result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
new file mode 100644
index 0000000..5457c05
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoAndCopyNullMask;
+import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
+import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.writeNullMask;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Serializer for {@link Row}.
+ */
+public class RowSerializer extends TypeSerializer<Row> {
+
+ private static final long serialVersionUID = 1L;
+ private final boolean[] nullMask;
+ private final TypeSerializer<Object>[] fieldSerializers;
+
+ public RowSerializer(TypeSerializer<?>[] fieldSerializers) {
+ this.fieldSerializers = (TypeSerializer<Object>[]) checkNotNull(fieldSerializers);
+ this.nullMask = new boolean[fieldSerializers.length];
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<Row> duplicate() {
+ boolean stateful = false;
+ TypeSerializer<?>[] duplicateFieldSerializers = new TypeSerializer[fieldSerializers.length];
+
+ for (int i = 0; i < fieldSerializers.length; i++) {
+ duplicateFieldSerializers[i] = fieldSerializers[i].duplicate();
+ if (duplicateFieldSerializers[i] != fieldSerializers[i]) {
+ // at least one of them is stateful
+ stateful = true;
+ }
+ }
+
+ if (stateful) {
+ return new RowSerializer(duplicateFieldSerializers);
+ } else {
+ return this;
+ }
+ }
+
+ @Override
+ public Row createInstance() {
+ return new Row(fieldSerializers.length);
+ }
+
+ @Override
+ public Row copy(Row from) {
+ int len = fieldSerializers.length;
+
+ if (from.getArity() != len) {
+ throw new RuntimeException("Row arity of from does not match serializers.");
+ }
+
+ Row result = new Row(len);
+ for (int i = 0; i < len; i++) {
+ Object fromField = from.getField(i);
+ if (fromField != null) {
+ Object copy = fieldSerializers[i].copy(fromField);
+ result.setField(i, copy);
+ } else {
+ result.setField(i, null);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public Row copy(Row from, Row reuse) {
+ int len = fieldSerializers.length;
+
+ // cannot reuse, do a non-reuse copy
+ if (reuse == null) {
+ return copy(from);
+ }
+
+ if (from.getArity() != len || reuse.getArity() != len) {
+ throw new RuntimeException(
+ "Row arity of reuse or from is incompatible with this RowSerializer.");
+ }
+
+ for (int i = 0; i < len; i++) {
+ Object fromField = from.getField(i);
+ if (fromField != null) {
+ Object reuseField = reuse.getField(i);
+ if (reuseField != null) {
+ Object copy = fieldSerializers[i].copy(fromField, reuseField);
+ reuse.setField(i, copy);
+ } else {
+ Object copy = fieldSerializers[i].copy(fromField);
+ reuse.setField(i, copy);
+ }
+ } else {
+ reuse.setField(i, null);
+ }
+ }
+ return reuse;
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(Row record, DataOutputView target) throws IOException {
+ int len = fieldSerializers.length;
+
+ if (record.getArity() != len) {
+ throw new RuntimeException("Row arity of from does not match serializers.");
+ }
+
+ // write a null mask
+ writeNullMask(len, record, target);
+
+ // serialize non-null fields
+ for (int i = 0; i < len; i++) {
+ Object o = record.getField(i);
+ if (o != null) {
+ fieldSerializers[i].serialize(o, target);
+ }
+ }
+ }
+
+
+ @Override
+ public Row deserialize(DataInputView source) throws IOException {
+ int len = fieldSerializers.length;
+
+ Row result = new Row(len);
+
+ // read null mask
+ readIntoNullMask(len, source, nullMask);
+
+ for (int i = 0; i < len; i++) {
+ if (nullMask[i]) {
+ result.setField(i, null);
+ } else {
+ result.setField(i, fieldSerializers[i].deserialize(source));
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public Row deserialize(Row reuse, DataInputView source) throws IOException {
+ int len = fieldSerializers.length;
+
+ if (reuse.getArity() != len) {
+ throw new RuntimeException("Row arity of from does not match serializers.");
+ }
+
+ // read null mask
+ readIntoNullMask(len, source, nullMask);
+
+ for (int i = 0; i < len; i++) {
+ if (nullMask[i]) {
+ reuse.setField(i, null);
+ } else {
+ Object reuseField = reuse.getField(i);
+ if (reuseField != null) {
+ reuse.setField(i, fieldSerializers[i].deserialize(reuseField, source));
+ } else {
+ reuse.setField(i, fieldSerializers[i].deserialize(source));
+ }
+ }
+ }
+
+ return reuse;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ int len = fieldSerializers.length;
+
+ // copy null mask
+ readIntoAndCopyNullMask(len, source, target, nullMask);
+
+ for (int i = 0; i < len; i++) {
+ if (!nullMask[i]) {
+ fieldSerializers[i].copy(source, target);
+ }
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (canEqual(obj)) {
+ RowSerializer other = (RowSerializer) obj;
+ if (this.fieldSerializers.length == other.fieldSerializers.length) {
+ for (int i = 0; i < this.fieldSerializers.length; i++) {
+ if (!this.fieldSerializers[i].equals(other.fieldSerializers[i])) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof RowSerializer;
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(fieldSerializers);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/main/java/org/apache/flink/types/Row.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/Row.java b/flink-core/src/main/java/org/apache/flink/types/Row.java
new file mode 100644
index 0000000..6825b71
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/types/Row.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.types;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.util.StringUtils;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * A Row can have arbitrary number of fields and contain a set of fields, which may all be
+ * different types. The fields in Row can be null. Due to Row is not strongly typed, Flink's
+ * type extraction mechanism can't extract correct field types. So that users should manually
+ * tell Flink the type information via creating a {@link RowTypeInfo}.
+ *
+ * <p>
+ * The fields in the Row can be accessed by position (zero-based) {@link #getField(int)}. And can
+ * set fields by {@link #setField(int, Object)}.
+ * <p>
+ * Row is in principle serializable. However, it may contain non-serializable fields,
+ * in which case serialization will fail.
+ *
+ */
+@PublicEvolving
+public class Row implements Serializable{
+
+ private static final long serialVersionUID = 1L;
+
+ /** The array to store actual values. */
+ private final Object[] fields;
+
+ /**
+ * Create a new Row instance.
+ * @param arity The number of fields in the Row
+ */
+ public Row(int arity) {
+ this.fields = new Object[arity];
+ }
+
+ /**
+ * Get the number of fields in the Row.
+ * @return The number of fields in the Row.
+ */
+ public int getArity() {
+ return fields.length;
+ }
+
+ /**
+ * Gets the field at the specified position.
+ * @param pos The position of the field, 0-based.
+ * @return The field at the specified position.
+ * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields.
+ */
+ public Object getField(int pos) {
+ return fields[pos];
+ }
+
+ /**
+ * Sets the field at the specified position.
+ *
+ * @param pos The position of the field, 0-based.
+ * @param value The value to be assigned to the field at the specified position.
+ * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields.
+ */
+ public void setField(int pos, Object value) {
+ fields[pos] = value;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < fields.length; i++) {
+ if (i > 0) {
+ sb.append(",");
+ }
+ sb.append(StringUtils.arrayAwareToString(fields[i]));
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ Row row = (Row) o;
+
+ return Arrays.equals(fields, row.fields);
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(fields);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
new file mode 100644
index 0000000..8de7bf7
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class RowTypeInfoTest {
+
+ @Test
+ public void testRowTypeInfoEquality() {
+ RowTypeInfo typeInfo1 = new RowTypeInfo(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ RowTypeInfo typeInfo2 = new RowTypeInfo(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ assertEquals(typeInfo1, typeInfo2);
+ assertEquals(typeInfo1.hashCode(), typeInfo2.hashCode());
+ }
+
+ @Test
+ public void testRowTypeInfoInequality() {
+ RowTypeInfo typeInfo1 = new RowTypeInfo(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ RowTypeInfo typeInfo2 = new RowTypeInfo(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.BOOLEAN_TYPE_INFO);
+
+ assertNotEquals(typeInfo1, typeInfo2);
+ assertNotEquals(typeInfo1.hashCode(), typeInfo2.hashCode());
+ }
+
+ @Test
+ public void testNestedRowTypeInfo() {
+ RowTypeInfo typeInfo = new RowTypeInfo(
+ BasicTypeInfo.INT_TYPE_INFO,
+ new RowTypeInfo(
+ BasicTypeInfo.SHORT_TYPE_INFO,
+ BasicTypeInfo.BIG_DEC_TYPE_INFO
+ ),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ assertEquals("Row(f0: Short, f1: BigDecimal)", typeInfo.getTypeAt("f1").toString());
+ assertEquals("Short", typeInfo.getTypeAt("f1.f0").toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorTest.java
new file mode 100644
index 0000000..ca54bd4
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.types.Row;
+import org.junit.BeforeClass;
+
+import java.io.Serializable;
+
+import static org.junit.Assert.assertEquals;
+
+public class RowComparatorTest extends ComparatorTestBase<Row> {
+
+ private static final RowTypeInfo typeInfo = new RowTypeInfo(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ new TupleTypeInfo<Tuple3<Integer, Boolean, Short>>(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.BOOLEAN_TYPE_INFO,
+ BasicTypeInfo.SHORT_TYPE_INFO
+ ),
+ TypeExtractor.createTypeInfo(MyPojo.class));
+
+ private static MyPojo testPojo1 = new MyPojo();
+ private static MyPojo testPojo2 = new MyPojo();
+ private static MyPojo testPojo3 = new MyPojo();
+
+ private static final Row[] data = new Row[]{
+ createRow(null, null, null, null, null),
+ createRow(0, null, null, null, null),
+ createRow(0, 0.0, null, null, null),
+ createRow(0, 0.0, "a", null, null),
+ createRow(1, 0.0, "a", null, null),
+ createRow(1, 1.0, "a", null, null),
+ createRow(1, 1.0, "b", null, null),
+ createRow(1, 1.0, "b", new Tuple3<>(1, false, (short) 2), null),
+ createRow(1, 1.0, "b", new Tuple3<>(2, false, (short) 2), null),
+ createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 2), null),
+ createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), null),
+ createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo1),
+ createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo2),
+ createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo3)
+ };
+
+ @BeforeClass
+ public static void init() {
+ // TODO we cannot test null here as PojoComparator has no support for null keys
+ testPojo1.name = "";
+ testPojo2.name = "Test1";
+ testPojo3.name = "Test2";
+ }
+
+ @Override
+ protected void deepEquals(String message, Row should, Row is) {
+ int arity = should.getArity();
+ assertEquals(message, arity, is.getArity());
+ for (int i = 0; i < arity; i++) {
+ Object copiedValue = should.getField(i);
+ Object element = is.getField(i);
+ assertEquals(message, element, copiedValue);
+ }
+ }
+
+ @Override
+ protected TypeComparator<Row> createComparator(boolean ascending) {
+ return typeInfo.createComparator(
+ new int[] {0, 1, 2, 3, 4, 5, 6},
+ new boolean[] {ascending, ascending, ascending, ascending, ascending, ascending, ascending},
+ 0,
+ new ExecutionConfig());
+ }
+
+ @Override
+ protected TypeSerializer<Row> createSerializer() {
+ return typeInfo.createSerializer(new ExecutionConfig());
+ }
+
+ @Override
+ protected Row[] getSortedTestData() {
+ return data;
+ }
+
+ @Override
+ protected boolean supportsNullKeys() {
+ return true;
+ }
+
+ private static Row createRow(Object f0, Object f1, Object f2, Object f3, Object f4) {
+ Row row = new Row(5);
+ row.setField(0, f0);
+ row.setField(1, f1);
+ row.setField(2, f2);
+ row.setField(3, f3);
+ row.setField(4, f4);
+ return row;
+ }
+
+ public static class MyPojo implements Serializable, Comparable<MyPojo> {
+ // we cannot use null because the PojoComparator does not support null properly
+ public String name = "";
+
+ @Override
+ public int compareTo(MyPojo o) {
+ if (name == null && o.name == null) {
+ return 0;
+ } else if (name == null) {
+ return -1;
+ } else if (o.name == null) {
+ return 1;
+ } else {
+ return name.compareTo(o.name);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ MyPojo myPojo = (MyPojo) o;
+
+ return name != null ? name.equals(myPojo.name) : myPojo.name == null;
+
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorWithManyFieldsTests.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorWithManyFieldsTests.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorWithManyFieldsTests.java
new file mode 100644
index 0000000..d0fdbd6
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorWithManyFieldsTests.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.RowComparator;
+import org.apache.flink.types.Row;
+import org.junit.BeforeClass;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link RowComparator} for wide rows.
+ */
+public class RowComparatorWithManyFieldsTests extends ComparatorTestBase<Row> {
+
+ private static final int numberOfFields = 10;
+ private static RowTypeInfo typeInfo;
+ private static final Row[] data = new Row[]{
+ createRow(null, "b0", "c0", "d0", "e0", "f0", "g0", "h0", "i0", "j0"),
+ createRow("a1", "b1", "c1", "d1", "e1", "f1", "g1", "h1", "i1", "j1"),
+ createRow("a2", "b2", "c2", "d2", "e2", "f2", "g2", "h2", "i2", "j2"),
+ createRow("a3", "b3", "c3", "d3", "e3", "f3", "g3", "h3", "i3", "j3")
+ };
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ TypeInformation<?>[] fieldTypes = new TypeInformation[numberOfFields];
+ for (int i = 0; i < numberOfFields; i++) {
+ fieldTypes[i] = BasicTypeInfo.STRING_TYPE_INFO;
+ }
+ typeInfo = new RowTypeInfo(fieldTypes);
+
+ }
+
+ @Override
+ protected void deepEquals(String message, Row should, Row is) {
+ int arity = should.getArity();
+ assertEquals(message, arity, is.getArity());
+ for (int i = 0; i < arity; i++) {
+ Object copiedValue = should.getField(i);
+ Object element = is.getField(i);
+ assertEquals(message, element, copiedValue);
+ }
+ }
+
+ @Override
+ protected TypeComparator<Row> createComparator(boolean ascending) {
+ return typeInfo.createComparator(
+ new int[]{0},
+ new boolean[]{ascending},
+ 0,
+ new ExecutionConfig());
+ }
+
+ @Override
+ protected TypeSerializer<Row> createSerializer() {
+ return typeInfo.createSerializer(new ExecutionConfig());
+ }
+
+ @Override
+ protected Row[] getSortedTestData() {
+ return data;
+ }
+
+ @Override
+ protected boolean supportsNullKeys() {
+ return true;
+ }
+
+ private static Row createRow(Object... values) {
+ checkNotNull(values);
+ checkArgument(values.length == numberOfFields);
+ Row row = new Row(numberOfFields);
+ for (int i = 0; i < values.length; i++) {
+ row.setField(i, values[i]);
+ }
+ return row;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java
new file mode 100644
index 0000000..d08d68a
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.types.Row;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+import static org.junit.Assert.assertEquals;
+
+public class RowSerializerTest {
+
+ @Test
+ public void testRowSerializer() {
+ TypeInformation<Row> typeInfo = new RowTypeInfo(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO);
+ Row row1 = new Row(2);
+ row1.setField(0, 1);
+ row1.setField(1, "a");
+
+ Row row2 = new Row(2);
+ row2.setField(0, 2);
+ row2.setField(1, null);
+
+ TypeSerializer<Row> serializer = typeInfo.createSerializer(new ExecutionConfig());
+ RowSerializerTestInstance instance = new RowSerializerTestInstance(serializer, row1, row2);
+ instance.testAll();
+ }
+
+ @Test
+ public void testLargeRowSerializer() {
+ TypeInformation<Row> typeInfo = new RowTypeInfo(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ Row row = new Row(13);
+ row.setField(0, 2);
+ row.setField(1, null);
+ row.setField(3, null);
+ row.setField(4, null);
+ row.setField(5, null);
+ row.setField(6, null);
+ row.setField(7, null);
+ row.setField(8, null);
+ row.setField(9, null);
+ row.setField(10, null);
+ row.setField(11, null);
+ row.setField(12, "Test");
+
+ TypeSerializer<Row> serializer = typeInfo.createSerializer(new ExecutionConfig());
+ RowSerializerTestInstance testInstance = new RowSerializerTestInstance(serializer, row);
+ testInstance.testAll();
+ }
+
+ @Test
+ public void testRowSerializerWithComplexTypes() {
+ TypeInformation<Row> typeInfo = new RowTypeInfo(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ new TupleTypeInfo<Tuple3<Integer, Boolean, Short>>(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.BOOLEAN_TYPE_INFO,
+ BasicTypeInfo.SHORT_TYPE_INFO),
+ TypeExtractor.createTypeInfo(MyPojo.class));
+
+ MyPojo testPojo1 = new MyPojo();
+ testPojo1.name = null;
+ MyPojo testPojo2 = new MyPojo();
+ testPojo2.name = "Test1";
+ MyPojo testPojo3 = new MyPojo();
+ testPojo3.name = "Test2";
+
+ Row[] data = new Row[]{
+ createRow(null, null, null, null, null),
+ createRow(0, null, null, null, null),
+ createRow(0, 0.0, null, null, null),
+ createRow(0, 0.0, "a", null, null),
+ createRow(1, 0.0, "a", null, null),
+ createRow(1, 1.0, "a", null, null),
+ createRow(1, 1.0, "b", null, null),
+ createRow(1, 1.0, "b", new Tuple3<>(1, false, (short) 2), null),
+ createRow(1, 1.0, "b", new Tuple3<>(2, false, (short) 2), null),
+ createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 2), null),
+ createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), null),
+ createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo1),
+ createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo2),
+ createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo3)
+ };
+
+ TypeSerializer<Row> serializer = typeInfo.createSerializer(new ExecutionConfig());
+ RowSerializerTestInstance testInstance = new RowSerializerTestInstance(serializer, data);
+ testInstance.testAll();
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ private static Row createRow(Object f0, Object f1, Object f2, Object f3, Object f4) {
+ Row row = new Row(5);
+ row.setField(0, f0);
+ row.setField(1, f1);
+ row.setField(2, f2);
+ row.setField(3, f3);
+ row.setField(4, f4);
+ return row;
+ }
+
+
+ private class RowSerializerTestInstance extends SerializerTestInstance<Row> {
+
+ RowSerializerTestInstance(
+ TypeSerializer<Row> serializer,
+ Row... testData) {
+ super(serializer, Row.class, -1, testData);
+ }
+
+ @Override
+ protected void deepEquals(String message, Row should, Row is) {
+ int arity = should.getArity();
+ assertEquals(message, arity, is.getArity());
+ for (int i = 0; i < arity; i++) {
+ Object copiedValue = should.getField(i);
+ Object element = is.getField(i);
+ assertEquals(message, element, copiedValue);
+ }
+ }
+ }
+
+ public static class MyPojo implements Serializable, Comparable<MyPojo> {
+ public String name = null;
+
+ @Override
+ public int compareTo(MyPojo o) {
+ if (name == null && o.name == null) {
+ return 0;
+ } else if (name == null) {
+ return -1;
+ } else if (o.name == null) {
+ return 1;
+ } else {
+ return name.compareTo(o.name);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ MyPojo myPojo = (MyPojo) o;
+
+ return name != null ? name.equals(myPojo.name) : myPojo.name == null;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/test/java/org/apache/flink/types/RowTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/RowTest.java b/flink-core/src/test/java/org/apache/flink/types/RowTest.java
new file mode 100644
index 0000000..35ba32d
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/types/RowTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.types;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class RowTest {
+ @Test
+ public void testRowToString() {
+ Row row = new Row(5);
+ row.setField(0, 1);
+ row.setField(1, "hello");
+ row.setField(2, null);
+ row.setField(3, new Tuple2<>(2, "hi"));
+ row.setField(4, "hello world");
+
+ assertEquals("1,hello,null,(2,hi),hello world", row.toString());
+ }
+}