You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/15 10:49:54 UTC

[4/7] flink git commit: [FLINK-5187] [core] Port Row and related type utils to Java and move them to flink-core.

[FLINK-5187] [core] Port Row and related type utils to Java and move them to flink-core.

This closes #2968.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86f8a255
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86f8a255
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86f8a255

Branch: refs/heads/master
Commit: 86f8a255db6ce2ff9e09c2824e85c4930427ecdb
Parents: 15e7f0a
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Thu Dec 8 22:44:29 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Dec 15 11:36:40 2016 +0100

----------------------------------------------------------------------
 docs/dev/types_serialization.md                 |   6 +-
 .../flink/api/java/typeutils/RowTypeInfo.java   | 203 ++++++++
 .../typeutils/runtime/NullAwareComparator.java  | 240 +++++++++
 .../java/typeutils/runtime/NullMaskUtils.java   | 105 ++++
 .../java/typeutils/runtime/RowComparator.java   | 488 +++++++++++++++++++
 .../java/typeutils/runtime/RowSerializer.java   | 243 +++++++++
 .../main/java/org/apache/flink/types/Row.java   | 116 +++++
 .../api/java/typeutils/RowTypeInfoTest.java     |  69 +++
 .../typeutils/runtime/RowComparatorTest.java    | 156 ++++++
 .../RowComparatorWithManyFieldsTests.java       | 103 ++++
 .../typeutils/runtime/RowSerializerTest.java    | 197 ++++++++
 .../java/org/apache/flink/types/RowTest.java    |  37 ++
 12 files changed, 1961 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/docs/dev/types_serialization.md
----------------------------------------------------------------------
diff --git a/docs/dev/types_serialization.md b/docs/dev/types_serialization.md
index ea02df0..2b43563 100644
--- a/docs/dev/types_serialization.md
+++ b/docs/dev/types_serialization.md
@@ -87,9 +87,11 @@ Internally, Flink makes the following distinctions between types:
 
 * Composite types
 
-  * Flink Java Tuples (part of the Flink Java API)
+  * Flink Java Tuples (part of the Flink Java API): max 25 fields, null fields not supported
 
-  * Scala *case classes* (including Scala tuples)
+  * Scala *case classes* (including Scala tuples): max 22 fields, null fields not supported
+
+  * Row: tuples with arbitrary number of fields and support for null fields
 
   * POJOs: classes that follow a certain bean-like pattern
 

http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
new file mode 100644
index 0000000..03cbe61
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.RowComparator;
+import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * TypeInformation for {@link Row}
+ */
+@PublicEvolving
+public class RowTypeInfo extends TupleTypeInfoBase<Row> {
+
+	private static final long serialVersionUID = 9158518989896601963L;
+
+	protected final String[] fieldNames;
+	/** Temporary variable for directly passing orders to comparators. */
+	private boolean[] comparatorOrders = null;
+
+	public RowTypeInfo(TypeInformation<?>... types) {
+		super(Row.class, types);
+
+		this.fieldNames = new String[types.length];
+
+		for (int i = 0; i < types.length; i++) {
+			fieldNames[i] = "f" + i;
+		}
+	}
+
+	@Override
+	public TypeComparator<Row> createComparator(
+		int[] logicalKeyFields,
+		boolean[] orders,
+		int logicalFieldOffset,
+		ExecutionConfig config) {
+
+		comparatorOrders = orders;
+		TypeComparator<Row> comparator = super.createComparator(
+			logicalKeyFields,
+			orders,
+			logicalFieldOffset,
+			config);
+		comparatorOrders = null;
+		return comparator;
+	}
+
+	@Override
+	protected TypeComparatorBuilder<Row> createTypeComparatorBuilder() {
+		if (comparatorOrders == null) {
+			throw new IllegalStateException("Cannot create comparator builder without orders.");
+		}
+		return new RowTypeComparatorBuilder(comparatorOrders);
+	}
+
+	@Override
+	public String[] getFieldNames() {
+		return fieldNames;
+	}
+
+	@Override
+	public int getFieldIndex(String fieldName) {
+		for (int i = 0; i < fieldNames.length; i++) {
+			if (fieldNames[i].equals(fieldName)) {
+				return i;
+			}
+		}
+		return -1;
+	}
+
+	@Override
+	public TypeSerializer<Row> createSerializer(ExecutionConfig config) {
+		int len = getArity();
+		TypeSerializer<?>[] fieldSerializers = new TypeSerializer[len];
+		for (int i = 0; i < len; i++) {
+			fieldSerializers[i] = types[i].createSerializer(config);
+		}
+		return new RowSerializer(fieldSerializers);
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof RowTypeInfo;
+	}
+
+	@Override
+	public int hashCode() {
+		return 31 * super.hashCode() + Arrays.hashCode(fieldNames);
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder bld = new StringBuilder("Row");
+		if (types.length > 0) {
+			bld.append('(').append(fieldNames[0]).append(": ").append(types[0]);
+
+			for (int i = 1; i < types.length; i++) {
+				bld.append(", ").append(fieldNames[i]).append(": ").append(types[i]);
+			}
+
+			bld.append(')');
+		}
+		return bld.toString();
+	}
+
+	private class RowTypeComparatorBuilder implements TypeComparatorBuilder<Row> {
+
+		private final ArrayList<TypeComparator> fieldComparators = new ArrayList<TypeComparator>();
+		private final ArrayList<Integer> logicalKeyFields = new ArrayList<Integer>();
+		private final boolean[] comparatorOrders;
+
+		public RowTypeComparatorBuilder(boolean[] comparatorOrders) {
+			this.comparatorOrders = comparatorOrders;
+		}
+
+		@Override
+		public void initializeTypeComparatorBuilder(int size) {
+			fieldComparators.ensureCapacity(size);
+			logicalKeyFields.ensureCapacity(size);
+		}
+
+		@Override
+		public void addComparatorField(int fieldId, TypeComparator<?> comparator) {
+			fieldComparators.add(comparator);
+			logicalKeyFields.add(fieldId);
+		}
+
+		@Override
+		public TypeComparator<Row> createTypeComparator(ExecutionConfig config) {
+			checkState(
+				fieldComparators.size() > 0,
+				"No field comparators were defined for the TupleTypeComparatorBuilder."
+			);
+
+			checkState(
+				logicalKeyFields.size() > 0,
+				"No key fields were defined for the TupleTypeComparatorBuilder."
+			);
+
+			checkState(
+				fieldComparators.size() == logicalKeyFields.size(),
+				"The number of field comparators and key fields is not equal."
+			);
+
+			final int maxKey = Collections.max(logicalKeyFields);
+
+			checkState(
+				maxKey >= 0,
+				"The maximum key field must be greater or equal than 0."
+			);
+
+			TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[maxKey + 1];
+
+			for (int i = 0; i <= maxKey; i++) {
+				fieldSerializers[i] = types[i].createSerializer(config);
+			}
+
+			int[] keyPositions = new int[logicalKeyFields.size()];
+			for (int i = 0; i < keyPositions.length; i++) {
+				keyPositions[i] = logicalKeyFields.get(i);
+			}
+
+			TypeComparator[] comparators = new TypeComparator[fieldComparators.size()];
+			for (int i = 0; i < fieldComparators.size(); i++) {
+				comparators[i] = fieldComparators.get(i);
+			}
+
+			//noinspection unchecked
+			return new RowComparator(
+				getArity(),
+				keyPositions,
+				comparators,
+				(TypeSerializer<Object>[]) fieldSerializers,
+				comparatorOrders);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullAwareComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullAwareComparator.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullAwareComparator.java
new file mode 100644
index 0000000..3587811
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullAwareComparator.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Null-aware comparator that wraps a comparator which does not support null references.
+ * <p>
+ * NOTE: This class assumes to be used within a composite type comparator (such
+ * as {@link RowComparator}) that handles serialized comparison.
+ */
+public class NullAwareComparator<T> extends TypeComparator<T> {
+	private static final long serialVersionUID = 1L;
+
+	private final TypeComparator<T> wrappedComparator;
+	private final boolean order;
+
+	// number of flat fields
+	private final int flatFields;
+
+	// stores the null for reference comparison
+	private boolean nullReference = false;
+
+	public NullAwareComparator(TypeComparator<T> wrappedComparator, boolean order) {
+		this.wrappedComparator = wrappedComparator;
+		this.order = order;
+		this.flatFields = wrappedComparator.getFlatComparators().length;
+	}
+
+	@Override
+	public int hash(T record) {
+		if (record != null) {
+			return wrappedComparator.hash(record);
+		} else {
+			return 0;
+		}
+	}
+
+	@Override
+	public void setReference(T toCompare) {
+		if (toCompare == null) {
+			nullReference = true;
+		} else {
+			nullReference = false;
+			wrappedComparator.setReference(toCompare);
+		}
+	}
+
+	@Override
+	public boolean equalToReference(T candidate) {
+		// both values are null
+		if (candidate == null && nullReference) {
+			return true;
+		}
+		// one value is null
+		else if (candidate == null || nullReference) {
+			return false;
+		}
+		// no null value
+		else {
+			return wrappedComparator.equalToReference(candidate);
+		}
+	}
+
+	@Override
+	public int compareToReference(TypeComparator<T> referencedComparator) {
+		NullAwareComparator otherComparator = (NullAwareComparator) referencedComparator;
+		boolean otherNullReference = otherComparator.nullReference;
+		// both values are null -> equality
+		if (nullReference && otherNullReference) {
+			return 0;
+		}
+		// first value is null -> inequality
+		// but order is considered
+		else if (nullReference) {
+			return order ? 1 : -1;
+		}
+		// second value is null -> inequality
+		// but order is considered
+		else if (otherNullReference) {
+			return order ? -1 : 1;
+		}
+		// no null values
+		else {
+			return wrappedComparator.compareToReference(otherComparator.wrappedComparator);
+		}
+	}
+
+	@Override
+	public int compare(T first, T second) {
+		// both values are null -> equality
+		if (first == null && second == null) {
+			return 0;
+		}
+		// first value is null -> inequality
+		// but order is considered
+		else if (first == null) {
+			return order ? -1 : 1;
+		}
+		// second value is null -> inequality
+		// but order is considered
+		else if (second == null) {
+			return order ? 1 : -1;
+		}
+		// no null values
+		else {
+			return wrappedComparator.compare(first, second);
+		}
+	}
+
+	@Override
+	public int compareSerialized(
+		DataInputView firstSource,
+		DataInputView secondSource) throws IOException {
+
+		throw new UnsupportedOperationException(
+			"Comparator does not support null-aware serialized comparision.");
+	}
+
+	@Override
+	public boolean supportsNormalizedKey() {
+		return wrappedComparator.supportsNormalizedKey();
+	}
+
+	@Override
+	public boolean supportsSerializationWithKeyNormalization() {
+		return false;
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		int len = wrappedComparator.getNormalizeKeyLen();
+		if (len == Integer.MAX_VALUE) {
+			return Integer.MAX_VALUE;
+		} else {
+			// add one for a null byte
+			return len + 1;
+		}
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return wrappedComparator.isNormalizedKeyPrefixOnly(keyBytes - 1);
+	}
+
+	@Override
+	public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
+		if (numBytes > 0) {
+			// write a null byte with padding
+			if (record == null) {
+				target.putBoolean(offset, false);
+				// write padding
+				for (int j = 0; j < numBytes - 1; j++) {
+					target.put(offset + 1 + j, (byte) 0);
+				}
+			}
+			// write a non-null byte with key
+			else {
+				target.putBoolean(offset, true);
+				// write key
+				wrappedComparator.putNormalizedKey(record, target, offset + 1, numBytes - 1);
+			}
+		}
+	}
+
+	@Override
+	public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException(
+			"Record serialization with leading normalized keys not supported.");
+	}
+
+	@Override
+	public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
+		throw new UnsupportedOperationException(
+			"Record deserialization with leading normalized keys not supported.");
+	}
+
+	@Override
+	public boolean invertNormalizedKey() {
+		return wrappedComparator.invertNormalizedKey();
+	}
+
+	@Override
+	public TypeComparator<T> duplicate() {
+		return new NullAwareComparator<T>(wrappedComparator.duplicate(), order);
+	}
+
+	@Override
+	public int extractKeys(Object record, Object[] target, int index) {
+		if (record == null) {
+			for (int i = 0; i < flatFields; i++) {
+				target[index + i] = null;
+			}
+			return flatFields;
+		} else {
+			return wrappedComparator.extractKeys(record, target, index);
+		}
+	}
+
+	@Override
+	public TypeComparator[] getFlatComparators() {
+		// determine the flat comparators and wrap them again in null-aware comparators
+		List<TypeComparator<?>> flatComparators = new ArrayList<>();
+		if (wrappedComparator instanceof CompositeTypeComparator) {
+			((CompositeTypeComparator) wrappedComparator).getFlatComparator(flatComparators);
+		} else {
+			flatComparators.add(wrappedComparator);
+		}
+
+		TypeComparator<?>[] result = new TypeComparator[flatComparators.size()];
+		for (int i = 0; i < result.length; i++) {
+			result[i] = new NullAwareComparator<>(flatComparators.get(i), order);
+		}
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullMaskUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullMaskUtils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullMaskUtils.java
new file mode 100644
index 0000000..010af7f
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullMaskUtils.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+
+public class NullMaskUtils {
+
+	public static void writeNullMask(int len, Row value, DataOutputView target) throws IOException {
+		int b = 0x00;
+		int bytePos = 0;
+
+		int fieldPos = 0;
+		int numPos = 0;
+		while (fieldPos < len) {
+			b = 0x00;
+			// set bits in byte
+			bytePos = 0;
+			numPos = Math.min(8, len - fieldPos);
+			while (bytePos < numPos) {
+				b = b << 1;
+				// set bit if field is null
+				if (value.getField(fieldPos + bytePos) == null) {
+					b |= 0x01;
+				}
+				bytePos += 1;
+			}
+			fieldPos += numPos;
+			// shift bits if last byte is not completely filled
+			b <<= (8 - bytePos);
+			// write byte
+			target.writeByte(b);
+		}
+	}
+
+	public static void readIntoNullMask(
+		int len,
+		DataInputView source,
+		boolean[] nullMask) throws IOException {
+
+		int b = 0x00;
+		int bytePos = 0;
+
+		int fieldPos = 0;
+		int numPos = 0;
+		while (fieldPos < len) {
+			// read byte
+			b = source.readUnsignedByte();
+			bytePos = 0;
+			numPos = Math.min(8, len - fieldPos);
+			while (bytePos < numPos) {
+				nullMask[fieldPos + bytePos] = (b & 0x80) > 0;
+				b = b << 1;
+				bytePos += 1;
+			}
+			fieldPos += numPos;
+		}
+	}
+
+	public static void readIntoAndCopyNullMask(
+		int len,
+		DataInputView source,
+		DataOutputView target,
+		boolean[] nullMask) throws IOException {
+
+		int b = 0x00;
+		int bytePos = 0;
+
+		int fieldPos = 0;
+		int numPos = 0;
+		while (fieldPos < len) {
+			// read byte
+			b = source.readUnsignedByte();
+			// copy byte
+			target.writeByte(b);
+			bytePos = 0;
+			numPos = Math.min(8, len - fieldPos);
+			while (bytePos < numPos) {
+				nullMask[fieldPos + bytePos] = (b & 0x80) > 0;
+				b = b << 1;
+				bytePos += 1;
+			}
+			fieldPos += numPos;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java
new file mode 100644
index 0000000..d6c5195
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.types.KeyFieldOutOfBoundsException;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Comparator for {@link Row}
+ */
+public class RowComparator extends CompositeTypeComparator<Row> {
+
+	private static final long serialVersionUID = 1L;
+	/** The number of fields of the Row */
+	private final int arity;
+	/** key positions describe which fields are keys in what order */
+	private final int[] keyPositions;
+	/** null-aware comparators for the key fields, in the same order as the key fields */
+	private final NullAwareComparator<Object>[] comparators;
+	/** serializers to deserialize the first n fields for comparison */
+	private final TypeSerializer<Object>[] serializers;
+	/** auxiliary fields for normalized key support */
+	private final int[] normalizedKeyLengths;
+	private final int numLeadingNormalizableKeys;
+	private final int normalizableKeyPrefixLen;
+	private final boolean invertNormKey;
+
+	// null masks for serialized comparison
+	private final boolean[] nullMask1;
+	private final boolean[] nullMask2;
+
+	// cache for the deserialized key field objects
+	transient private final Object[] deserializedKeyFields1;
+	transient private final Object[] deserializedKeyFields2;
+
+	/**
+	 * General constructor for RowComparator.
+	 *
+	 * @param arity        the number of fields of the Row
+	 * @param keyPositions key positions describe which fields are keys in what order
+	 * @param comparators  non-null-aware comparators for the key fields, in the same order as
+	 *                     the key fields
+	 * @param serializers  serializers to deserialize the first n fields for comparison
+	 * @param orders       sorting orders for the fields
+	 */
+	public RowComparator(
+		int arity,
+		int[] keyPositions,
+		TypeComparator<Object>[] comparators,
+		TypeSerializer<Object>[] serializers,
+		boolean[] orders) {
+
+		this(arity, keyPositions, makeNullAware(comparators, orders), serializers);
+	}
+
+
+	/**
+	 * Intermediate constructor for creating auxiliary fields.
+	 */
+	private RowComparator(
+		int arity,
+		int[] keyPositions,
+		NullAwareComparator<Object>[] comparators,
+		TypeSerializer<Object>[] serializers) {
+
+		this(
+			arity,
+			keyPositions,
+			comparators,
+			serializers,
+			createAuxiliaryFields(keyPositions, comparators));
+	}
+
+	/**
+	 * Intermediate constructor for creating auxiliary fields.
+	 */
+	private RowComparator(
+		int arity,
+		int[] keyPositions,
+		NullAwareComparator<Object>[] comparators,
+		TypeSerializer<Object>[] serializers,
+		Tuple4<int[], Integer, Integer, Boolean> auxiliaryFields) {
+
+		this(
+			arity,
+			keyPositions,
+			comparators,
+			serializers,
+			auxiliaryFields.f0,
+			auxiliaryFields.f1,
+			auxiliaryFields.f2,
+			auxiliaryFields.f3);
+	}
+
+	/**
+	 * Intermediate constructor for creating auxiliary fields.
+	 */
+	private RowComparator(
+		int arity,
+		int[] keyPositions,
+		NullAwareComparator<Object>[] comparators,
+		TypeSerializer<Object>[] serializers,
+		int[] normalizedKeyLengths,
+		int numLeadingNormalizableKeys,
+		int normalizableKeyPrefixLen,
+		boolean invertNormKey) {
+
+		this.arity = arity;
+		this.keyPositions = keyPositions;
+		this.comparators = comparators;
+		this.serializers = serializers;
+		this.normalizedKeyLengths = normalizedKeyLengths;
+		this.numLeadingNormalizableKeys = numLeadingNormalizableKeys;
+		this.normalizableKeyPrefixLen = normalizableKeyPrefixLen;
+		this.invertNormKey = invertNormKey;
+		this.nullMask1 = new boolean[arity];
+		this.nullMask2 = new boolean[arity];
+		deserializedKeyFields1 = instantiateDeserializationFields();
+		deserializedKeyFields2 = instantiateDeserializationFields();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Comparator Methods
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void getFlatComparator(List<TypeComparator> flatComparators) {
+		for (NullAwareComparator<Object> c : comparators) {
+			Collections.addAll(flatComparators, c.getFlatComparators());
+		}
+	}
+
+	@Override
+	public int hash(Row record) {
+		int code = 0;
+		int i = 0;
+
+		try {
+			for (; i < keyPositions.length; i++) {
+				code *= TupleComparatorBase.HASH_SALT[i & 0x1F];
+				Object element = record.getField(keyPositions[i]); // element can be null
+				code += comparators[i].hash(element);
+			}
+		} catch (IndexOutOfBoundsException e) {
+			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+		}
+
+		return code;
+	}
+
+	@Override
+	public void setReference(Row toCompare) {
+		int i = 0;
+		try {
+			for (; i < keyPositions.length; i++) {
+				TypeComparator<Object> comparator = comparators[i];
+				Object element = toCompare.getField(keyPositions[i]);
+				comparator.setReference(element);   // element can be null
+			}
+		} catch (IndexOutOfBoundsException e) {
+			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+		}
+	}
+
+	@Override
+	public boolean equalToReference(Row candidate) {
+		int i = 0;
+		try {
+			for (; i < keyPositions.length; i++) {
+				TypeComparator<Object> comparator = comparators[i];
+				Object element = candidate.getField(keyPositions[i]);   // element can be null
+				// check if reference is not equal
+				if (!comparator.equalToReference(element)) {
+					return false;
+				}
+			}
+		} catch (IndexOutOfBoundsException e) {
+			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+		}
+		return true;
+	}
+
+	@Override
+	public int compareToReference(TypeComparator<Row> referencedComparator) {
+		RowComparator other = (RowComparator) referencedComparator;
+		int i = 0;
+		try {
+			for (; i < keyPositions.length; i++) {
+				int cmp = comparators[i].compareToReference(other.comparators[i]);
+				if (cmp != 0) {
+					return cmp;
+				}
+			}
+		} catch (IndexOutOfBoundsException e) {
+			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+		}
+		return 0;
+	}
+
+	@Override
+	public int compare(Row first, Row second) {
+		int i = 0;
+		try {
+			for (; i < keyPositions.length; i++) {
+				int keyPos = keyPositions[i];
+				TypeComparator<Object> comparator = comparators[i];
+				Object firstElement = first.getField(keyPos);   // element can be null
+				Object secondElement = second.getField(keyPos); // element can be null
+
+				int cmp = comparator.compare(firstElement, secondElement);
+				if (cmp != 0) {
+					return cmp;
+				}
+			}
+		} catch (IndexOutOfBoundsException e) {
+			throw new KeyFieldOutOfBoundsException(keyPositions[i]);
+		}
+		return 0;
+	}
+
+	@Override
+	public int compareSerialized(
+		DataInputView firstSource,
+		DataInputView secondSource) throws IOException {
+
+		int len = serializers.length;
+		int keyLen = keyPositions.length;
+
+		readIntoNullMask(arity, firstSource, nullMask1);
+		readIntoNullMask(arity, secondSource, nullMask2);
+
+		// deserialize
+		for (int i = 0; i < len; i++) {
+			TypeSerializer<Object> serializer = serializers[i];
+
+			// deserialize field 1
+			if (!nullMask1[i]) {
+				deserializedKeyFields1[i] = serializer.deserialize(
+					deserializedKeyFields1[i],
+					firstSource);
+			}
+
+			// deserialize field 2
+			if (!nullMask2[i]) {
+				deserializedKeyFields2[i] = serializer.deserialize(
+					deserializedKeyFields2[i],
+					secondSource);
+			}
+		}
+
+		// compare
+		for (int i = 0; i < keyLen; i++) {
+			int keyPos = keyPositions[i];
+			TypeComparator<Object> comparator = comparators[i];
+
+			boolean isNull1 = nullMask1[keyPos];
+			boolean isNull2 = nullMask2[keyPos];
+
+			int cmp = 0;
+			// both values are null -> equality
+			if (isNull1 && isNull2) {
+				cmp = 0;
+			}
+			// first value is null -> inequality
+			else if (isNull1) {
+				cmp = comparator.compare(null, deserializedKeyFields2[keyPos]);
+			}
+			// second value is null -> inequality
+			else if (isNull2) {
+				cmp = comparator.compare(deserializedKeyFields1[keyPos], null);
+			}
+			// no null values
+			else {
+				cmp = comparator.compare(
+					deserializedKeyFields1[keyPos],
+					deserializedKeyFields2[keyPos]);
+			}
+
+			if (cmp != 0) {
+				return cmp;
+			}
+		}
+
+		return 0;
+	}
+
+	@Override
+	public boolean supportsNormalizedKey() {
+		return numLeadingNormalizableKeys > 0;
+	}
+
+	@Override
+	public boolean supportsSerializationWithKeyNormalization() {
+		return false;
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		return normalizableKeyPrefixLen;
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		return numLeadingNormalizableKeys < keyPositions.length ||
+				normalizableKeyPrefixLen == Integer.MAX_VALUE ||
+				normalizableKeyPrefixLen > keyBytes;
+	}
+
+	@Override
+	public void putNormalizedKey(Row record, MemorySegment target, int offset, int numBytes) {
+		int bytesLeft = numBytes;
+		int currentOffset = offset;
+
+		for (int i = 0; i < numLeadingNormalizableKeys && bytesLeft > 0; i++) {
+			int len = normalizedKeyLengths[i];
+			len = bytesLeft >= len ? len : bytesLeft;
+
+			TypeComparator<Object> comparator = comparators[i];
+			Object element = record.getField(keyPositions[i]);  // element can be null
+			// write key
+			comparator.putNormalizedKey(element, target, currentOffset, len);
+
+			bytesLeft -= len;
+			currentOffset += len;
+		}
+
+	}
+
+	@Override
+	public void writeWithKeyNormalization(Row record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException(
+			"Record serialization with leading normalized keys not supported.");
+	}
+
+	@Override
+	public Row readWithKeyDenormalization(Row reuse, DataInputView source) throws IOException {
+		throw new UnsupportedOperationException(
+			"Record deserialization with leading normalized keys not supported.");
+	}
+
+	@Override
+	public boolean invertNormalizedKey() {
+		return invertNormKey;
+	}
+
+	@Override
+	public TypeComparator<Row> duplicate() {
+		NullAwareComparator<?>[] comparatorsCopy = new NullAwareComparator<?>[comparators.length];
+		for (int i = 0; i < comparators.length; i++) {
+			comparatorsCopy[i] = (NullAwareComparator<?>) comparators[i].duplicate();
+		}
+
+		TypeSerializer<?>[] serializersCopy = new TypeSerializer<?>[serializers.length];
+		for (int i = 0; i < serializers.length; i++) {
+			serializersCopy[i] = serializers[i].duplicate();
+		}
+
+		return new RowComparator(
+			arity,
+			keyPositions,
+			(NullAwareComparator<Object>[]) comparatorsCopy,
+			(TypeSerializer<Object>[]) serializersCopy,
+			normalizedKeyLengths,
+			numLeadingNormalizableKeys,
+			normalizableKeyPrefixLen,
+			invertNormKey);
+	}
+
+	@Override
+	public int extractKeys(Object record, Object[] target, int index) {
+		int len = comparators.length;
+		int localIndex = index;
+		for (int i = 0; i < len; i++) {
+			Object element = ((Row) record).getField(keyPositions[i]);  // element can be null
+			localIndex += comparators[i].extractKeys(element, target, localIndex);
+		}
+		return localIndex - index;
+	}
+
+
+	private Object[] instantiateDeserializationFields() {
+		Object[] newFields = new Object[serializers.length];
+		for (int i = 0; i < serializers.length; i++) {
+			newFields[i] = serializers[i].createInstance();
+		}
+		return newFields;
+	}
+
+	/**
+	 * @return creates auxiliary fields for normalized key support
+	 */
+	private static Tuple4<int[], Integer, Integer, Boolean>
+	createAuxiliaryFields(int[] keyPositions, NullAwareComparator<Object>[] comparators) {
+
+		int[] normalizedKeyLengths = new int[keyPositions.length];
+		int numLeadingNormalizableKeys = 0;
+		int normalizableKeyPrefixLen = 0;
+		boolean inverted = false;
+
+		for (int i = 0; i < keyPositions.length; i++) {
+			NullAwareComparator<Object> k = comparators[i];
+			// as long as the leading keys support normalized keys, we can build up the composite key
+			if (k.supportsNormalizedKey()) {
+				if (i == 0) {
+					// the first comparator decides whether we need to invert the key direction
+					inverted = k.invertNormalizedKey();
+				} else if (k.invertNormalizedKey() != inverted) {
+					// if a successor does not agree on the inversion direction, it cannot be part of the
+					// normalized key
+					return new Tuple4<>(
+						normalizedKeyLengths,
+						numLeadingNormalizableKeys,
+						normalizableKeyPrefixLen,
+						inverted);
+				}
+				numLeadingNormalizableKeys++;
+				int len = k.getNormalizeKeyLen();
+				if (len < 0) {
+					throw new RuntimeException(
+						"Comparator " + k.getClass().getName() +
+						" specifies an invalid length for the normalized key: " + len);
+				}
+				normalizedKeyLengths[i] = len;
+				normalizableKeyPrefixLen += len;
+				if (normalizableKeyPrefixLen < 0) {
+					// overflow, which means we are out of budget for normalized key space anyways
+					return new Tuple4<>(
+						normalizedKeyLengths,
+						numLeadingNormalizableKeys,
+						Integer.MAX_VALUE,
+						inverted);
+				}
+			} else {
+				return new Tuple4<>(
+					normalizedKeyLengths,
+					numLeadingNormalizableKeys,
+					normalizableKeyPrefixLen,
+					inverted);
+			}
+		}
+		return new Tuple4<>(
+			normalizedKeyLengths,
+			numLeadingNormalizableKeys,
+			normalizableKeyPrefixLen,
+			inverted);
+	}
+
+	private static NullAwareComparator<Object>[] makeNullAware(
+		TypeComparator<Object>[] comparators,
+		boolean[] orders) {
+
+		checkArgument(comparators.length == orders.length);
+		NullAwareComparator<?>[] result = new NullAwareComparator<?>[comparators.length];
+		for (int i = 0; i < comparators.length; i++) {
+			result[i] = new NullAwareComparator<Object>(comparators[i], orders[i]);
+		}
+		return (NullAwareComparator<Object>[]) result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
new file mode 100644
index 0000000..5457c05
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Row;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoAndCopyNullMask;
+import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
+import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.writeNullMask;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Serializer for {@link Row}.
+ */
+public class RowSerializer extends TypeSerializer<Row> {
+
+	private static final long serialVersionUID = 1L;
+	private final boolean[] nullMask;
+	private final TypeSerializer<Object>[] fieldSerializers;
+
+	public RowSerializer(TypeSerializer<?>[] fieldSerializers) {
+		this.fieldSerializers = (TypeSerializer<Object>[]) checkNotNull(fieldSerializers);
+		this.nullMask = new boolean[fieldSerializers.length];
+	}
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public TypeSerializer<Row> duplicate() {
+		boolean stateful = false;
+		TypeSerializer<?>[] duplicateFieldSerializers = new TypeSerializer[fieldSerializers.length];
+
+		for (int i = 0; i < fieldSerializers.length; i++) {
+			duplicateFieldSerializers[i] = fieldSerializers[i].duplicate();
+			if (duplicateFieldSerializers[i] != fieldSerializers[i]) {
+				// at least one of them is stateful
+				stateful = true;
+			}
+		}
+
+		if (stateful) {
+			return new RowSerializer(duplicateFieldSerializers);
+		} else {
+			return this;
+		}
+	}
+
+	@Override
+	public Row createInstance() {
+		return new Row(fieldSerializers.length);
+	}
+
+	@Override
+	public Row copy(Row from) {
+		int len = fieldSerializers.length;
+
+		if (from.getArity() != len) {
+			throw new RuntimeException("Row arity of from does not match serializers.");
+		}
+
+		Row result = new Row(len);
+		for (int i = 0; i < len; i++) {
+			Object fromField = from.getField(i);
+			if (fromField != null) {
+				Object copy = fieldSerializers[i].copy(fromField);
+				result.setField(i, copy);
+			} else {
+				result.setField(i, null);
+			}
+		}
+		return result;
+	}
+
+	@Override
+	public Row copy(Row from, Row reuse) {
+		int len = fieldSerializers.length;
+
+		// cannot reuse, do a non-reuse copy
+		if (reuse == null) {
+			return copy(from);
+		}
+
+		if (from.getArity() != len || reuse.getArity() != len) {
+			throw new RuntimeException(
+				"Row arity of reuse or from is incompatible with this RowSerializer.");
+		}
+
+		for (int i = 0; i < len; i++) {
+			Object fromField = from.getField(i);
+			if (fromField != null) {
+				Object reuseField = reuse.getField(i);
+				if (reuseField != null) {
+					Object copy = fieldSerializers[i].copy(fromField, reuseField);
+					reuse.setField(i, copy);
+				} else {
+					Object copy = fieldSerializers[i].copy(fromField);
+					reuse.setField(i, copy);
+				}
+			} else {
+				reuse.setField(i, null);
+			}
+		}
+		return reuse;
+	}
+
+	@Override
+	public int getLength() {
+		return -1;
+	}
+
+	@Override
+	public void serialize(Row record, DataOutputView target) throws IOException {
+		int len = fieldSerializers.length;
+
+		if (record.getArity() != len) {
+			throw new RuntimeException("Row arity of from does not match serializers.");
+		}
+
+		// write a null mask
+		writeNullMask(len, record, target);
+
+		// serialize non-null fields
+		for (int i = 0; i < len; i++) {
+			Object o = record.getField(i);
+			if (o != null) {
+				fieldSerializers[i].serialize(o, target);
+			}
+		}
+	}
+
+
+	@Override
+	public Row deserialize(DataInputView source) throws IOException {
+		int len = fieldSerializers.length;
+
+		Row result = new Row(len);
+
+		// read null mask
+		readIntoNullMask(len, source, nullMask);
+
+		for (int i = 0; i < len; i++) {
+			if (nullMask[i]) {
+				result.setField(i, null);
+			} else {
+				result.setField(i, fieldSerializers[i].deserialize(source));
+			}
+		}
+
+		return result;
+	}
+
+	@Override
+	public Row deserialize(Row reuse, DataInputView source) throws IOException {
+		int len = fieldSerializers.length;
+
+		if (reuse.getArity() != len) {
+			throw new RuntimeException("Row arity of from does not match serializers.");
+		}
+
+		// read null mask
+		readIntoNullMask(len, source, nullMask);
+
+		for (int i = 0; i < len; i++) {
+			if (nullMask[i]) {
+				reuse.setField(i, null);
+			} else {
+				Object reuseField = reuse.getField(i);
+				if (reuseField != null) {
+					reuse.setField(i, fieldSerializers[i].deserialize(reuseField, source));
+				} else {
+					reuse.setField(i, fieldSerializers[i].deserialize(source));
+				}
+			}
+		}
+
+		return reuse;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		int len = fieldSerializers.length;
+
+		// copy null mask
+		readIntoAndCopyNullMask(len, source, target, nullMask);
+
+		for (int i = 0; i < len; i++) {
+			if (!nullMask[i]) {
+				fieldSerializers[i].copy(source, target);
+			}
+		}
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (canEqual(obj)) {
+			RowSerializer other = (RowSerializer) obj;
+			if (this.fieldSerializers.length == other.fieldSerializers.length) {
+				for (int i = 0; i < this.fieldSerializers.length; i++) {
+					if (!this.fieldSerializers[i].equals(other.fieldSerializers[i])) {
+						return false;
+					}
+				}
+				return true;
+			}
+		}
+
+		return false;
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof RowSerializer;
+	}
+
+	@Override
+	public int hashCode() {
+		return Arrays.hashCode(fieldSerializers);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/main/java/org/apache/flink/types/Row.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/Row.java b/flink-core/src/main/java/org/apache/flink/types/Row.java
new file mode 100644
index 0000000..6825b71
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/types/Row.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.types;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.util.StringUtils;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * A Row can have arbitrary number of fields and contain a set of fields, which may all be
+ * different types. The fields in Row can be null. Due to Row is not strongly typed, Flink's
+ * type extraction mechanism can't extract correct field types. So that users should manually
+ * tell Flink the type information via creating a {@link RowTypeInfo}.
+ *
+ * <p>
+ * The fields in the Row can be accessed by position (zero-based) {@link #getField(int)}. And can
+ * set fields by {@link #setField(int, Object)}.
+ * <p>
+ * Row is in principle serializable. However, it may contain non-serializable fields,
+ * in which case serialization will fail.
+ *
+ */
+@PublicEvolving
+public class Row implements Serializable{
+
+	private static final long serialVersionUID = 1L;
+
+	/** The array to store actual values. */
+	private final Object[] fields;
+
+	/**
+	 * Create a new Row instance.
+	 * @param arity The number of fields in the Row
+	 */
+	public Row(int arity) {
+		this.fields = new Object[arity];
+	}
+
+	/**
+	 * Get the number of fields in the Row.
+	 * @return The number of fields in the Row.
+	 */
+	public int getArity() {
+		return fields.length;
+	}
+
+	/**
+	 * Gets the field at the specified position.
+	 * @param pos The position of the field, 0-based.
+	 * @return The field at the specified position.
+	 * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields.
+	 */
+	public Object getField(int pos) {
+		return fields[pos];
+	}
+
+	/**
+	 * Sets the field at the specified position.
+	 *
+	 * @param pos The position of the field, 0-based.
+	 * @param value The value to be assigned to the field at the specified position.
+	 * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields.
+	 */
+	public void setField(int pos, Object value) {
+		fields[pos] = value;
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+		for (int i = 0; i < fields.length; i++) {
+			if (i > 0) {
+				sb.append(",");
+			}
+			sb.append(StringUtils.arrayAwareToString(fields[i]));
+		}
+		return sb.toString();
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		Row row = (Row) o;
+
+		return Arrays.equals(fields, row.fields);
+	}
+
+	@Override
+	public int hashCode() {
+		return Arrays.hashCode(fields);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
new file mode 100644
index 0000000..8de7bf7
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class RowTypeInfoTest {
+
+	@Test
+	public void testRowTypeInfoEquality() {
+		RowTypeInfo typeInfo1 = new RowTypeInfo(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO);
+
+		RowTypeInfo typeInfo2 = new RowTypeInfo(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO);
+
+		assertEquals(typeInfo1, typeInfo2);
+		assertEquals(typeInfo1.hashCode(), typeInfo2.hashCode());
+	}
+
+	@Test
+	public void testRowTypeInfoInequality() {
+		RowTypeInfo typeInfo1 = new RowTypeInfo(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO);
+
+		RowTypeInfo typeInfo2 = new RowTypeInfo(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.BOOLEAN_TYPE_INFO);
+
+		assertNotEquals(typeInfo1, typeInfo2);
+		assertNotEquals(typeInfo1.hashCode(), typeInfo2.hashCode());
+	}
+
+	@Test
+	public void testNestedRowTypeInfo() {
+		RowTypeInfo typeInfo = new RowTypeInfo(
+			BasicTypeInfo.INT_TYPE_INFO,
+			new RowTypeInfo(
+				BasicTypeInfo.SHORT_TYPE_INFO,
+			    BasicTypeInfo.BIG_DEC_TYPE_INFO
+			),
+			BasicTypeInfo.STRING_TYPE_INFO);
+
+		assertEquals("Row(f0: Short, f1: BigDecimal)", typeInfo.getTypeAt("f1").toString());
+		assertEquals("Short", typeInfo.getTypeAt("f1.f0").toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorTest.java
new file mode 100644
index 0000000..ca54bd4
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.types.Row;
+import org.junit.BeforeClass;
+
+import java.io.Serializable;
+
+import static org.junit.Assert.assertEquals;
+
+public class RowComparatorTest extends ComparatorTestBase<Row> {
+
+	private static final RowTypeInfo typeInfo = new RowTypeInfo(
+		BasicTypeInfo.INT_TYPE_INFO,
+		BasicTypeInfo.DOUBLE_TYPE_INFO,
+		BasicTypeInfo.STRING_TYPE_INFO,
+		new TupleTypeInfo<Tuple3<Integer, Boolean, Short>>(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.BOOLEAN_TYPE_INFO,
+			BasicTypeInfo.SHORT_TYPE_INFO
+		),
+		TypeExtractor.createTypeInfo(MyPojo.class));
+
+	private static MyPojo testPojo1 = new MyPojo();
+	private static MyPojo testPojo2 = new MyPojo();
+	private static MyPojo testPojo3 = new MyPojo();
+
+	private static final Row[] data = new Row[]{
+		createRow(null, null, null, null, null),
+		createRow(0, null, null, null, null),
+		createRow(0, 0.0, null, null, null),
+		createRow(0, 0.0, "a", null, null),
+		createRow(1, 0.0, "a", null, null),
+		createRow(1, 1.0, "a", null, null),
+		createRow(1, 1.0, "b", null, null),
+		createRow(1, 1.0, "b", new Tuple3<>(1, false, (short) 2), null),
+		createRow(1, 1.0, "b", new Tuple3<>(2, false, (short) 2), null),
+		createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 2), null),
+		createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), null),
+		createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo1),
+		createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo2),
+		createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo3)
+	};
+
+	@BeforeClass
+	public static void init() {
+		// TODO we cannot test null here as PojoComparator has no support for null keys
+		testPojo1.name = "";
+		testPojo2.name = "Test1";
+		testPojo3.name = "Test2";
+	}
+
+	@Override
+	protected void deepEquals(String message, Row should, Row is) {
+		int arity = should.getArity();
+		assertEquals(message, arity, is.getArity());
+		for (int i = 0; i < arity; i++) {
+			Object copiedValue = should.getField(i);
+			Object element = is.getField(i);
+			assertEquals(message, element, copiedValue);
+		}
+	}
+
+	@Override
+	protected TypeComparator<Row> createComparator(boolean ascending) {
+		return typeInfo.createComparator(
+			new int[] {0, 1, 2, 3, 4, 5, 6},
+		    new boolean[] {ascending, ascending, ascending, ascending, ascending, ascending, ascending},
+		    0,
+		    new ExecutionConfig());
+	}
+
+	@Override
+	protected TypeSerializer<Row> createSerializer() {
+		return typeInfo.createSerializer(new ExecutionConfig());
+	}
+
+	@Override
+	protected Row[] getSortedTestData() {
+		return data;
+	}
+
+	@Override
+	protected boolean supportsNullKeys() {
+		return true;
+	}
+
+	private static Row createRow(Object f0, Object f1, Object f2, Object f3, Object f4) {
+		Row row = new Row(5);
+		row.setField(0, f0);
+		row.setField(1, f1);
+		row.setField(2, f2);
+		row.setField(3, f3);
+		row.setField(4, f4);
+		return row;
+	}
+
+	public static class MyPojo implements Serializable, Comparable<MyPojo> {
+		// we cannot use null because the PojoComparator does not support null properly
+		public String name = "";
+
+		@Override
+		public int compareTo(MyPojo o) {
+			if (name == null && o.name == null) {
+				return 0;
+			} else if (name == null) {
+				return -1;
+			} else if (o.name == null) {
+				return 1;
+			} else {
+				return name.compareTo(o.name);
+			}
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+
+			MyPojo myPojo = (MyPojo) o;
+
+			return name != null ? name.equals(myPojo.name) : myPojo.name == null;
+
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorWithManyFieldsTests.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorWithManyFieldsTests.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorWithManyFieldsTests.java
new file mode 100644
index 0000000..d0fdbd6
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorWithManyFieldsTests.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.RowComparator;
+import org.apache.flink.types.Row;
+import org.junit.BeforeClass;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link RowComparator} for wide rows.
+ */
+public class RowComparatorWithManyFieldsTests extends ComparatorTestBase<Row> {
+
+	private static final int numberOfFields = 10;
+	private static RowTypeInfo typeInfo;
+	private static final Row[] data = new Row[]{
+		createRow(null, "b0", "c0", "d0", "e0", "f0", "g0", "h0", "i0", "j0"),
+		createRow("a1", "b1", "c1", "d1", "e1", "f1", "g1", "h1", "i1", "j1"),
+		createRow("a2", "b2", "c2", "d2", "e2", "f2", "g2", "h2", "i2", "j2"),
+		createRow("a3", "b3", "c3", "d3", "e3", "f3", "g3", "h3", "i3", "j3")
+	};
+
+	@BeforeClass
+	public static void setUp() throws Exception {
+		TypeInformation<?>[] fieldTypes = new TypeInformation[numberOfFields];
+		for (int i = 0; i < numberOfFields; i++) {
+			fieldTypes[i] = BasicTypeInfo.STRING_TYPE_INFO;
+		}
+		typeInfo = new RowTypeInfo(fieldTypes);
+
+	}
+
+	@Override
+	protected void deepEquals(String message, Row should, Row is) {
+		int arity = should.getArity();
+		assertEquals(message, arity, is.getArity());
+		for (int i = 0; i < arity; i++) {
+			Object copiedValue = should.getField(i);
+			Object element = is.getField(i);
+			assertEquals(message, element, copiedValue);
+		}
+	}
+
+	@Override
+	protected TypeComparator<Row> createComparator(boolean ascending) {
+		return typeInfo.createComparator(
+			new int[]{0},
+			new boolean[]{ascending},
+			0,
+			new ExecutionConfig());
+	}
+
+	@Override
+	protected TypeSerializer<Row> createSerializer() {
+		return typeInfo.createSerializer(new ExecutionConfig());
+	}
+
+	@Override
+	protected Row[] getSortedTestData() {
+		return data;
+	}
+
+	@Override
+	protected boolean supportsNullKeys() {
+		return true;
+	}
+
+	private static Row createRow(Object... values) {
+		checkNotNull(values);
+		checkArgument(values.length == numberOfFields);
+		Row row = new Row(numberOfFields);
+		for (int i = 0; i < values.length; i++) {
+			row.setField(i, values[i]);
+		}
+		return row;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java
new file mode 100644
index 0000000..d08d68a
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.types.Row;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+import static org.junit.Assert.assertEquals;
+
+public class RowSerializerTest {
+
+	@Test
+	public void testRowSerializer() {
+		TypeInformation<Row> typeInfo = new RowTypeInfo(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO);
+		Row row1 = new Row(2);
+		row1.setField(0, 1);
+		row1.setField(1, "a");
+
+		Row row2 = new Row(2);
+		row2.setField(0, 2);
+		row2.setField(1, null);
+
+		TypeSerializer<Row> serializer = typeInfo.createSerializer(new ExecutionConfig());
+		RowSerializerTestInstance instance = new RowSerializerTestInstance(serializer, row1, row2);
+		instance.testAll();
+	}
+
+	@Test
+	public void testLargeRowSerializer() {
+		TypeInformation<Row> typeInfo = new RowTypeInfo(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO);
+
+		Row row = new Row(13);
+		row.setField(0, 2);
+		row.setField(1, null);
+		row.setField(3, null);
+		row.setField(4, null);
+		row.setField(5, null);
+		row.setField(6, null);
+		row.setField(7, null);
+		row.setField(8, null);
+		row.setField(9, null);
+		row.setField(10, null);
+		row.setField(11, null);
+		row.setField(12, "Test");
+
+		TypeSerializer<Row> serializer = typeInfo.createSerializer(new ExecutionConfig());
+		RowSerializerTestInstance testInstance = new RowSerializerTestInstance(serializer, row);
+		testInstance.testAll();
+	}
+
+	@Test
+	public void testRowSerializerWithComplexTypes() {
+		TypeInformation<Row> typeInfo = new RowTypeInfo(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.DOUBLE_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO,
+			new TupleTypeInfo<Tuple3<Integer, Boolean, Short>>(
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.BOOLEAN_TYPE_INFO,
+				BasicTypeInfo.SHORT_TYPE_INFO),
+			TypeExtractor.createTypeInfo(MyPojo.class));
+
+		MyPojo testPojo1 = new MyPojo();
+		testPojo1.name = null;
+		MyPojo testPojo2 = new MyPojo();
+		testPojo2.name = "Test1";
+		MyPojo testPojo3 = new MyPojo();
+		testPojo3.name = "Test2";
+
+		Row[] data = new Row[]{
+			createRow(null, null, null, null, null),
+			createRow(0, null, null, null, null),
+			createRow(0, 0.0, null, null, null),
+			createRow(0, 0.0, "a", null, null),
+			createRow(1, 0.0, "a", null, null),
+			createRow(1, 1.0, "a", null, null),
+			createRow(1, 1.0, "b", null, null),
+			createRow(1, 1.0, "b", new Tuple3<>(1, false, (short) 2), null),
+			createRow(1, 1.0, "b", new Tuple3<>(2, false, (short) 2), null),
+			createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 2), null),
+			createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), null),
+			createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo1),
+			createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo2),
+			createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo3)
+		};
+
+		TypeSerializer<Row> serializer = typeInfo.createSerializer(new ExecutionConfig());
+		RowSerializerTestInstance testInstance = new RowSerializerTestInstance(serializer, data);
+		testInstance.testAll();
+	}
+
+	// ----------------------------------------------------------------------------------------------
+
+	private static Row createRow(Object f0, Object f1, Object f2, Object f3, Object f4) {
+		Row row = new Row(5);
+		row.setField(0, f0);
+		row.setField(1, f1);
+		row.setField(2, f2);
+		row.setField(3, f3);
+		row.setField(4, f4);
+		return row;
+	}
+
+
+	private class RowSerializerTestInstance extends SerializerTestInstance<Row> {
+
+		RowSerializerTestInstance(
+			TypeSerializer<Row> serializer,
+			Row... testData) {
+			super(serializer, Row.class, -1, testData);
+		}
+
+		@Override
+		protected void deepEquals(String message, Row should, Row is) {
+			int arity = should.getArity();
+			assertEquals(message, arity, is.getArity());
+			for (int i = 0; i < arity; i++) {
+				Object copiedValue = should.getField(i);
+				Object element = is.getField(i);
+				assertEquals(message, element, copiedValue);
+			}
+		}
+	}
+
+	public static class MyPojo implements Serializable, Comparable<MyPojo> {
+		public String name = null;
+
+		@Override
+		public int compareTo(MyPojo o) {
+			if (name == null && o.name == null) {
+				return 0;
+			} else if (name == null) {
+				return -1;
+			} else if (o.name == null) {
+				return 1;
+			} else {
+				return name.compareTo(o.name);
+			}
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+
+			MyPojo myPojo = (MyPojo) o;
+
+			return name != null ? name.equals(myPojo.name) : myPojo.name == null;
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86f8a255/flink-core/src/test/java/org/apache/flink/types/RowTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/RowTest.java b/flink-core/src/test/java/org/apache/flink/types/RowTest.java
new file mode 100644
index 0000000..35ba32d
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/types/RowTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.types;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class RowTest {
+	@Test
+	public void testRowToString() {
+		Row row = new Row(5);
+		row.setField(0, 1);
+		row.setField(1, "hello");
+		row.setField(2, null);
+		row.setField(3, new Tuple2<>(2, "hi"));
+		row.setField(4, "hello world");
+
+		assertEquals("1,hello,null,(2,hi),hello world", row.toString());
+	}
+}