You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 23:37:33 UTC

[5/5] flink git commit: [FLINK-2565] Support primitive arrays as keys

[FLINK-2565] Support primitive arrays as keys

This closes #1043


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0807eec0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0807eec0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0807eec0

Branch: refs/heads/master
Commit: 0807eec0cb1acf8052a77b6133387e25399fce08
Parents: 1e38d6f
Author: zentol <s....@web.de>
Authored: Sun Aug 23 15:36:47 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 27 20:17:10 2015 +0200

----------------------------------------------------------------------
 .../common/typeinfo/PrimitiveArrayTypeInfo.java |  47 +++++--
 .../array/BooleanPrimitiveArrayComparator.java  |  56 +++++++++
 .../array/BytePrimitiveArrayComparator.java     |  56 +++++++++
 .../array/CharPrimitiveArrayComparator.java     |  56 +++++++++
 .../array/DoublePrimitiveArrayComparator.java   |  57 +++++++++
 .../array/FloatPrimitiveArrayComparator.java    |  56 +++++++++
 .../base/array/IntPrimitiveArrayComparator.java |  56 +++++++++
 .../array/LongPrimitiveArrayComparator.java     |  56 +++++++++
 .../base/array/PrimitiveArrayComparator.java    | 121 +++++++++++++++++++
 .../array/ShortPrimitiveArrayComparator.java    |  56 +++++++++
 .../BooleanPrimitiveArrayComparatorTest.java    |  45 +++++++
 .../array/BytePrimitiveArrayComparatorTest.java |  44 +++++++
 .../array/CharPrimitiveArrayComparatorTest.java |  42 +++++++
 .../DoublePrimitiveArrayComparatorTest.java     |  44 +++++++
 .../FloatPrimitiveArrayComparatorTest.java      |  44 +++++++
 .../array/IntPrimitiveArrayComparatorTest.java  |  44 +++++++
 .../array/LongPrimitiveArrayComparatorTest.java |  44 +++++++
 .../array/PrimitiveArrayComparatorTestBase.java |  41 +++++++
 .../ShortPrimitiveArrayComparatorTest.java      |  44 +++++++
 .../flink/api/java/operator/GroupingTest.java   |  14 ++-
 .../javaApiOperators/GroupReduceITCase.java     |  31 +++++
 .../util/CollectionDataSets.java                |  18 +++
 22 files changed, 1057 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
index 83126ab..3843f28 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
@@ -24,13 +24,22 @@ import java.util.Map;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BooleanPrimitiveArrayComparator;
 import org.apache.flink.api.common.typeutils.base.array.BooleanPrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.CharPrimitiveArrayComparator;
 import org.apache.flink.api.common.typeutils.base.array.CharPrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.DoublePrimitiveArrayComparator;
 import org.apache.flink.api.common.typeutils.base.array.DoublePrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.FloatPrimitiveArrayComparator;
 import org.apache.flink.api.common.typeutils.base.array.FloatPrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.IntPrimitiveArrayComparator;
 import org.apache.flink.api.common.typeutils.base.array.IntPrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArrayComparator;
 import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.PrimitiveArrayComparator;
+import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArrayComparator;
 import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArraySerializer;
 
 /**
@@ -39,19 +48,18 @@ import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArraySeria
  *
  * @param <T> The type represented by this type information, e.g., int[], double[], long[]
  */
-public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> {
-	
+public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
+
 	private static final long serialVersionUID = 1L;
 
-	public static final PrimitiveArrayTypeInfo<boolean[]> BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<boolean[]>(boolean[].class, BooleanPrimitiveArraySerializer.INSTANCE);
-	public static final PrimitiveArrayTypeInfo<byte[]> BYTE_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<byte[]>(byte[].class, BytePrimitiveArraySerializer.INSTANCE);
-	public static final PrimitiveArrayTypeInfo<short[]> SHORT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<short[]>(short[].class, ShortPrimitiveArraySerializer.INSTANCE);
-	public static final PrimitiveArrayTypeInfo<int[]> INT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<int[]>(int[].class, IntPrimitiveArraySerializer.INSTANCE);
-	public static final PrimitiveArrayTypeInfo<long[]> LONG_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<long[]>(long[].class, LongPrimitiveArraySerializer.INSTANCE);
-	public static final PrimitiveArrayTypeInfo<float[]> FLOAT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<float[]>(float[].class, FloatPrimitiveArraySerializer.INSTANCE);
-	public static final PrimitiveArrayTypeInfo<double[]> DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<double[]>(double[].class, DoublePrimitiveArraySerializer.INSTANCE);
-	public static final PrimitiveArrayTypeInfo<char[]> CHAR_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<char[]>(char[].class, CharPrimitiveArraySerializer.INSTANCE);
-	
+	public static final PrimitiveArrayTypeInfo<boolean[]> BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<boolean[]>(boolean[].class, BooleanPrimitiveArraySerializer.INSTANCE, BooleanPrimitiveArrayComparator.class);
+	public static final PrimitiveArrayTypeInfo<byte[]> BYTE_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<byte[]>(byte[].class, BytePrimitiveArraySerializer.INSTANCE, BytePrimitiveArrayComparator.class);
+	public static final PrimitiveArrayTypeInfo<short[]> SHORT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<short[]>(short[].class, ShortPrimitiveArraySerializer.INSTANCE, ShortPrimitiveArrayComparator.class);
+	public static final PrimitiveArrayTypeInfo<int[]> INT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<int[]>(int[].class, IntPrimitiveArraySerializer.INSTANCE, IntPrimitiveArrayComparator.class);
+	public static final PrimitiveArrayTypeInfo<long[]> LONG_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<long[]>(long[].class, LongPrimitiveArraySerializer.INSTANCE, LongPrimitiveArrayComparator.class);
+	public static final PrimitiveArrayTypeInfo<float[]> FLOAT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<float[]>(float[].class, FloatPrimitiveArraySerializer.INSTANCE, FloatPrimitiveArrayComparator.class);
+	public static final PrimitiveArrayTypeInfo<double[]> DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<double[]>(double[].class, DoublePrimitiveArraySerializer.INSTANCE, DoublePrimitiveArrayComparator.class);
+	public static final PrimitiveArrayTypeInfo<char[]> CHAR_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<char[]>(char[].class, CharPrimitiveArraySerializer.INSTANCE, CharPrimitiveArrayComparator.class);
 	// --------------------------------------------------------------------------------------------
 	
 	/** The class of the array (such as int[].class) */
@@ -60,12 +68,15 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> {
 	/** The serializer for the array */
 	private final TypeSerializer<T> serializer;
 
+	/** The class of the comparator for the array */
+	private Class<? extends PrimitiveArrayComparator<T, ?>> comparatorClass;
+
 	/**
 	 * Creates a new type info for a 
 	 * @param arrayClass The class of the array (such as int[].class)
 	 * @param serializer The serializer for the array.
 	 */
-	private PrimitiveArrayTypeInfo(Class<T> arrayClass, TypeSerializer<T> serializer) {
+	private PrimitiveArrayTypeInfo(Class<T> arrayClass, TypeSerializer<T> serializer, Class<? extends PrimitiveArrayComparator<T, ?>> comparatorClass) {
 		if (arrayClass == null || serializer == null) {
 			throw new NullPointerException();
 		}
@@ -74,6 +85,7 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> {
 		}
 		this.arrayClass = arrayClass;
 		this.serializer = serializer;
+		this.comparatorClass = comparatorClass;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -105,7 +117,7 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> {
 
 	@Override
 	public boolean isKeyType() {
-		return false;
+		return true;
 	}
 
 	@Override
@@ -161,4 +173,13 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> {
 		TYPES.put(double[].class, DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO);
 		TYPES.put(char[].class, CHAR_PRIMITIVE_ARRAY_TYPE_INFO);
 	}
+
+	@Override
+	public PrimitiveArrayComparator<T, ?> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
+		try {
+			return comparatorClass.getConstructor(boolean.class).newInstance(sortOrderAscending);
+		} catch (Exception e) {
+			throw new RuntimeException("Could not initialize primitive " + comparatorClass.getName() + " array comparator.", e);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparator.java
new file mode 100644
index 0000000..b7487b8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.BooleanComparator;
+
+public class BooleanPrimitiveArrayComparator extends PrimitiveArrayComparator<boolean[], BooleanComparator> {
+	public BooleanPrimitiveArrayComparator(boolean ascending) {
+		super(ascending, new BooleanComparator(ascending));
+	}
+
+	@Override
+	public int hash(boolean[] record) {
+		int result = 0;
+		for (boolean field : record) {
+			result += field ? 1231 : 1237;
+		}
+		return result;
+	}
+
+	@Override
+	public int compare(boolean[] first, boolean[] second) {
+		for (int x = 0; x < min(first.length, second.length); x++) {
+			int cmp = (second[x] == first[x] ? 0 : (first[x] ? 1 : -1));
+			if (cmp != 0) {
+				return ascending ? cmp : -cmp;
+			}
+		}
+		int cmp = first.length - second.length;
+		return ascending ? cmp : -cmp;
+	}
+
+	@Override
+	public TypeComparator<boolean[]> duplicate() {
+		BooleanPrimitiveArrayComparator dupe = new BooleanPrimitiveArrayComparator(this.ascending);
+		dupe.setReference(this.reference);
+		return dupe;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparator.java
new file mode 100644
index 0000000..d914c3e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.ByteComparator;
+
+public class BytePrimitiveArrayComparator extends PrimitiveArrayComparator<byte[], ByteComparator> {
+	public BytePrimitiveArrayComparator(boolean ascending) {
+		super(ascending, new ByteComparator(ascending));
+	}
+
+	@Override
+	public int hash(byte[] record) {
+		int result = 0;
+		for (byte field : record) {
+			result += (int) field;
+		}
+		return result;
+	}
+
+	@Override
+	public int compare(byte[] first, byte[] second) {
+		for (int x = 0; x < min(first.length, second.length); x++) {
+			int cmp = first[x] - second[x];
+			if (cmp != 0) {
+				return ascending ? cmp : -cmp;
+			}
+		}
+		int cmp = first.length - second.length;
+		return ascending ? cmp : -cmp;
+	}
+
+	@Override
+	public TypeComparator<byte[]> duplicate() {
+		BytePrimitiveArrayComparator dupe = new BytePrimitiveArrayComparator(this.ascending);
+		dupe.setReference(this.reference);
+		return dupe;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparator.java
new file mode 100644
index 0000000..d734152
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.CharComparator;
+
+public class CharPrimitiveArrayComparator extends PrimitiveArrayComparator<char[], CharComparator> {
+	public CharPrimitiveArrayComparator(boolean ascending) {
+		super(ascending, new CharComparator(ascending));
+	}
+
+	@Override
+	public int hash(char[] record) {
+		int result = 0;
+		for (char field : record) {
+			result += (int) field;
+		}
+		return result;
+	}
+
+	@Override
+	public int compare(char[] first, char[] second) {
+		for (int x = 0; x < min(first.length, second.length); x++) {
+			int cmp = first[x] - second[x];
+			if (cmp != 0) {
+				return ascending ? cmp : -cmp;
+			}
+		}
+		int cmp = first.length - second.length;
+		return ascending ? cmp : -cmp;
+	}
+
+	@Override
+	public TypeComparator<char[]> duplicate() {
+		CharPrimitiveArrayComparator dupe = new CharPrimitiveArrayComparator(this.ascending);
+		dupe.setReference(this.reference);
+		return dupe;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparator.java
new file mode 100644
index 0000000..5153fa5
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.DoubleComparator;
+
+public class DoublePrimitiveArrayComparator extends PrimitiveArrayComparator<double[], DoubleComparator> {
+	public DoublePrimitiveArrayComparator(boolean ascending) {
+		super(ascending, new DoubleComparator(ascending));
+	}
+
+	@Override
+	public int hash(double[] record) {
+		int result = 0;
+		for (double field : record) {
+			long bits = Double.doubleToLongBits(field);
+			result += (int) (bits ^ (bits >>> 32));
+		}
+		return result;
+	}
+
+	@Override
+	public int compare(double[] first, double[] second) {
+		for (int x = 0; x < min(first.length, second.length); x++) {
+			int cmp = Double.compare(first[x], second[x]);
+			if (cmp != 0) {
+				return ascending ? cmp : -cmp;
+			}
+		}
+		int cmp = first.length - second.length;
+		return ascending ? cmp : -cmp;
+	}
+
+	@Override
+	public TypeComparator<double[]> duplicate() {
+		DoublePrimitiveArrayComparator dupe = new DoublePrimitiveArrayComparator(this.ascending);
+		dupe.setReference(this.reference);
+		return dupe;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparator.java
new file mode 100644
index 0000000..5a5986e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.FloatComparator;
+
+public class FloatPrimitiveArrayComparator extends PrimitiveArrayComparator<float[], FloatComparator> {
+	public FloatPrimitiveArrayComparator(boolean ascending) {
+		super(ascending, new FloatComparator(ascending));
+	}
+
+	@Override
+	public int hash(float[] record) {
+		int result = 0;
+		for (float field : record) {
+			result += Float.floatToIntBits(field);
+		}
+		return result;
+	}
+
+	@Override
+	public int compare(float[] first, float[] second) {
+		for (int x = 0; x < min(first.length, second.length); x++) {
+			int cmp = Float.compare(first[x], second[x]);
+			if (cmp != 0) {
+				return ascending ? cmp : -cmp;
+			}
+		}
+		int cmp = first.length - second.length;
+		return ascending ? cmp : -cmp;
+	}
+
+	@Override
+	public TypeComparator<float[]> duplicate() {
+		FloatPrimitiveArrayComparator dupe = new FloatPrimitiveArrayComparator(this.ascending);
+		dupe.setReference(this.reference);
+		return dupe;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparator.java
new file mode 100644
index 0000000..78cb2ae
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+
+public class IntPrimitiveArrayComparator extends PrimitiveArrayComparator<int[], IntComparator> {
+	public IntPrimitiveArrayComparator(boolean ascending) {
+		super(ascending, new IntComparator(ascending));
+	}
+
+	@Override
+	public int hash(int[] record) {
+		int result = 0;
+		for (int field : record) {
+			result += field;
+		}
+		return result;
+	}
+
+	@Override
+	public int compare(int[] first, int[] second) {
+		for (int x = 0; x < min(first.length, second.length); x++) {
+			int cmp = first[x] - second[x];
+			if (cmp != 0) {
+				return ascending ? cmp : -cmp;
+			}
+		}
+		int cmp = first.length - second.length;
+		return ascending ? cmp : -cmp;
+	}
+
+	@Override
+	public TypeComparator<int[]> duplicate() {
+		IntPrimitiveArrayComparator dupe = new IntPrimitiveArrayComparator(this.ascending);
+		dupe.setReference(this.reference);
+		return dupe;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparator.java
new file mode 100644
index 0000000..c0a69bc
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+
+public class LongPrimitiveArrayComparator extends PrimitiveArrayComparator<long[], LongComparator> {
+	public LongPrimitiveArrayComparator(boolean ascending) {
+		super(ascending, new LongComparator(ascending));
+	}
+
+	@Override
+	public int hash(long[] record) {
+		int result = 0;
+		for (long field : record) {
+			result += (int) (field ^ (field >>> 32));
+		}
+		return result;
+	}
+
+	@Override
+	public int compare(long[] first, long[] second) {
+		for (int x = 0; x < min(first.length, second.length); x++) {
+			int cmp = first[x] < second[x] ? -1 : (first[x] == second[x] ? 0 : 1);
+			if (cmp != 0) {
+				return ascending ? cmp : -cmp;
+			}
+		}
+		int cmp = first.length - second.length;
+		return ascending ? cmp : -cmp;
+	}
+
+	@Override
+	public TypeComparator<long[]> duplicate() {
+		LongPrimitiveArrayComparator dupe = new LongPrimitiveArrayComparator(this.ascending);
+		dupe.setReference(this.reference);
+		return dupe;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparator.java
new file mode 100644
index 0000000..ba53aff
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparator.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import java.io.IOException;
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.BasicTypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+public abstract class PrimitiveArrayComparator<T, C extends BasicTypeComparator> extends TypeComparator<T> {
+	// For use by getComparators
+	@SuppressWarnings("rawtypes")
+	private final TypeComparator[] comparators = new TypeComparator[]{this};
+
+	protected final boolean ascending;
+	protected transient T reference;
+	protected final C comparator;
+
+	public PrimitiveArrayComparator(boolean ascending, C comparator) {
+		this.ascending = ascending;
+		this.comparator = comparator;
+	}
+
+	@Override
+	public void setReference(T toCompare) {
+		this.reference = toCompare;
+	}
+
+	@Override
+	public boolean equalToReference(T candidate) {
+		return compare(this.reference, candidate) == 0;
+	}
+
+	@Override
+	public int compareToReference(TypeComparator<T> referencedComparator) {
+		return compare(((PrimitiveArrayComparator<T, C>) referencedComparator).reference, this.reference);
+	}
+
+	@Override
+	public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+		int firstCount = firstSource.readInt();
+		int secondCount = secondSource.readInt();
+		for (int x = 0; x < min(firstCount, secondCount); x++) {
+			int cmp = comparator.compareSerialized(firstSource, secondSource);
+			if (cmp != 0) {
+				return cmp;
+			}
+		}
+		int cmp = firstCount - secondCount;
+		return ascending ? cmp : -cmp;
+	}
+
+	@Override
+	public int extractKeys(Object record, Object[] target, int index) {
+		target[index] = record;
+		return 1;
+	}
+
+	@Override
+	public TypeComparator[] getFlatComparators() {
+		return comparators;
+	}
+
+	@Override
+	public boolean supportsNormalizedKey() {
+		return false;
+	}
+
+	@Override
+	public boolean supportsSerializationWithKeyNormalization() {
+		return false;
+	}
+
+	@Override
+	public int getNormalizeKeyLen() {
+		return 0;
+	}
+
+	@Override
+	public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean invertNormalizedKey() {
+		return !ascending;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparator.java
new file mode 100644
index 0000000..5943694
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.ShortComparator;
+
+public class ShortPrimitiveArrayComparator extends PrimitiveArrayComparator<short[], ShortComparator> {
+	public ShortPrimitiveArrayComparator(boolean ascending) {
+		super(ascending, new ShortComparator(ascending));
+	}
+
+	@Override
+	public int hash(short[] record) {
+		int result = 0;
+		for (short field : record) {
+			result += (int) field;
+		}
+		return result;
+	}
+
+	@Override
+	public int compare(short[] first, short[] second) {
+		for (int x = 0; x < min(first.length, second.length); x++) {
+			int cmp = first[x] - second[x];
+			if (cmp != 0) {
+				return ascending ? cmp : -cmp;
+			}
+		}
+		int cmp = first.length - second.length;
+		return ascending ? cmp : -cmp;
+	}
+
+	@Override
+	public TypeComparator<short[]> duplicate() {
+		ShortPrimitiveArrayComparator dupe = new ShortPrimitiveArrayComparator(this.ascending);
+		dupe.setReference(this.reference);
+		return dupe;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparatorTest.java
new file mode 100644
index 0000000..4db71bf
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparatorTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.junit.Assert;
+
+public class BooleanPrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase<boolean[]> {
+	public BooleanPrimitiveArrayComparatorTest() {
+		super(PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO);
+	}
+
+	@Override
+	protected void deepEquals(String message, boolean[] should, boolean[] is) {
+		Assert.assertTrue(should.length == is.length);
+		for(int x=0; x< should.length; x++) {
+			Assert.assertEquals(should[x], is[x]);
+		}
+	}
+
+	@Override
+	protected boolean[][] getSortedTestData() {
+		return new boolean[][]{
+			new boolean[]{false, false},
+			new boolean[]{false, true},
+			new boolean[]{false, true, true},
+			new boolean[]{true},
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparatorTest.java
new file mode 100644
index 0000000..4c57702
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparatorTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.junit.Assert;
+
+public class BytePrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase<byte[]> {
+	public BytePrimitiveArrayComparatorTest() {
+		super(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
+	}
+
+	@Override
+	protected void deepEquals(String message, byte[] should, byte[] is) {
+		Assert.assertArrayEquals(message, should, is);
+	}
+
+	@Override
+	protected byte[][] getSortedTestData() {
+		return new byte[][]{
+			new byte[]{-1, 0},
+			new byte[]{0, -1},
+			new byte[]{0, 0},
+			new byte[]{0, 1},
+			new byte[]{0, 1, 2},
+			new byte[]{2}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparatorTest.java
new file mode 100644
index 0000000..b318168
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparatorTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.junit.Assert;
+
+public class CharPrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase<char[]> {
+	public CharPrimitiveArrayComparatorTest() {
+		super(PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO);
+	}
+
+	@Override
+	protected void deepEquals(String message, char[] should, char[] is) {
+		Assert.assertArrayEquals(message, should, is);
+	}
+
+	@Override
+	protected char[][] getSortedTestData() {
+		return new char[][]{
+			new char[]{0, 0},
+			new char[]{0, 1},
+			new char[]{0, 1, 2},
+			new char[]{2}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparatorTest.java
new file mode 100644
index 0000000..b5d7e1d
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparatorTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.junit.Assert;
+
+public class DoublePrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase<double[]> {
+	public DoublePrimitiveArrayComparatorTest() {
+		super(PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO);
+	}
+
+	@Override
+	protected void deepEquals(String message, double[] should, double[] is) {
+		Assert.assertArrayEquals(message, should, is, 0.00001);
+	}
+
+	@Override
+	protected double[][] getSortedTestData() {
+		return new double[][]{
+			new double[]{-1, 0},
+			new double[]{0, -1},
+			new double[]{0, 0},
+			new double[]{0, 1},
+			new double[]{0, 1, 2},
+			new double[]{2}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparatorTest.java
new file mode 100644
index 0000000..830049e
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparatorTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.junit.Assert;
+
+public class FloatPrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase<float[]> {
+	public FloatPrimitiveArrayComparatorTest() {
+		super(PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO);
+	}
+
+	@Override
+	protected void deepEquals(String message, float[] should, float[] is) {
+		Assert.assertArrayEquals(message, should, is, (float) 0.00001);
+	}
+
+	@Override
+	protected float[][] getSortedTestData() {
+		return new float[][]{
+			new float[]{-1, 0},
+			new float[]{0, -1},
+			new float[]{0, 0},
+			new float[]{0, 1},
+			new float[]{0, 1, 2},
+			new float[]{2}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparatorTest.java
new file mode 100644
index 0000000..6c05f23
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparatorTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.junit.Assert;
+
+public class IntPrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase<int[]> {
+	public IntPrimitiveArrayComparatorTest() {
+		super(PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+	}
+
+	@Override
+	protected void deepEquals(String message, int[] should, int[] is) {
+		Assert.assertArrayEquals(message, should, is);
+	}
+
+	@Override
+	protected int[][] getSortedTestData() {
+		return new int[][]{
+			new int[]{-1, 0},
+			new int[]{0, -1},
+			new int[]{0, 0},
+			new int[]{0, 1},
+			new int[]{0, 1, 2},
+			new int[]{2}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparatorTest.java
new file mode 100644
index 0000000..0ae573e
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparatorTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.junit.Assert;
+
+public class LongPrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase<long[]> {
+	public LongPrimitiveArrayComparatorTest() {
+		super(PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO);
+	}
+
+	@Override
+	protected void deepEquals(String message, long[] should, long[] is) {
+		Assert.assertArrayEquals(message, should, is);
+	}
+
+	@Override
+	protected long[][] getSortedTestData() {
+		return new long[][]{
+			new long[]{-1, 0},
+			new long[]{0, -1},
+			new long[]{0, 0},
+			new long[]{0, 1},
+			new long[]{0, 1, 2},
+			new long[]{2}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparatorTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparatorTestBase.java
new file mode 100644
index 0000000..ff620dd
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparatorTestBase.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public abstract class PrimitiveArrayComparatorTestBase<T> extends ComparatorTestBase<T> {
+	private PrimitiveArrayTypeInfo<T> info;
+
+	public PrimitiveArrayComparatorTestBase(PrimitiveArrayTypeInfo<T> info) {
+		this.info = info;
+	}
+
+	@Override
+	protected TypeComparator<T> createComparator(boolean ascending) {
+		return info.createComparator(ascending, null).duplicate();
+	}
+
+	@Override
+	protected TypeSerializer<T> createSerializer() {
+		return info.createSerializer(null);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparatorTest.java
new file mode 100644
index 0000000..5b48dc2
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparatorTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.junit.Assert;
+
+public class ShortPrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase<short[]> {
+	public ShortPrimitiveArrayComparatorTest() {
+		super(PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO);
+	}
+
+	@Override
+	protected void deepEquals(String message, short[] should, short[] is) {
+		Assert.assertArrayEquals(message, should, is);
+	}
+
+	@Override
+	protected short[][] getSortedTestData() {
+		return new short[][]{
+			new short[]{-1, 0},
+			new short[]{0, -1},
+			new short[]{0, 0},
+			new short[]{0, 1},
+			new short[]{0, 1, 2},
+			new short[]{2}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
index b3922b3..bdad3db 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
@@ -67,8 +67,9 @@ public class GroupingTest {
 
 	private final List<Tuple4<Integer, Long, CustomType, Long[]>> tupleWithCustomData =
 			new ArrayList<Tuple4<Integer, Long, CustomType, Long[]>>();
-
 	
+	private final List<Tuple2<byte[], byte[]>> byteArrayData = new ArrayList<Tuple2<byte[], byte[]>>();
+
 	@Test  
 	public void testGroupByKeyFields1() {
 		
@@ -127,6 +128,15 @@ public class GroupingTest {
 	}
 
 	@Test
+	public void testGroupByKeyFieldsOnPrimitiveArray() {
+		this.byteArrayData.add(new Tuple2(new byte[]{0}, new byte[]{1}));
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple2<byte[], byte[]>> tupleDs = env.fromCollection(byteArrayData);
+		tupleDs.groupBy(0);
+	}
+
+	@Test
 	public void testGroupByKeyExpressions1() {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -613,7 +623,7 @@ public class GroupingTest {
 	public static class CustomType2 implements Serializable {
 
 		public int myInt;
-		public int[] myIntArray;
+		public Integer[] myIntArray;
 
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index 260de1c..95a8cb0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -62,6 +62,37 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
+	public void testCorrectnessofGroupReduceOnTupleContainingPrimitiveByteArrayWithKeyFieldSelectors() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<byte[], Integer>> ds = CollectionDataSets.getTuple2WithByteArrayDataSet(env);
+		DataSet<Integer> reduceDs = ds.
+				groupBy(0).reduceGroup(new ByteArrayGroupReduce());
+
+		List<Integer> result = reduceDs.collect();
+
+		String expected = "0\n"
+				+ "1\n"
+				+ "2\n"
+				+ "3\n"
+				+ "4\n";
+
+		compareResultAsText(result, expected);
+
+	}
+
+	public static class ByteArrayGroupReduce implements GroupReduceFunction<Tuple2<byte[], Integer>, Integer> {
+		@Override
+		public void reduce(Iterable<Tuple2<byte[], Integer>> values, Collector<Integer> out) throws Exception {
+			int sum = 0;
+			for (Tuple2<byte[], Integer> value : values) {
+				sum += value.f1;
+			}
+			out.collect(sum);
+		}
+	}
+
+	@Test
 	public void testCorrectnessOfGroupReduceOnTuplesWithKeyFieldSelector() throws Exception{
 		/*
 		 * check correctness of groupReduce on tuples with key field selector

http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
index 1faf4c1..9fb275f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
@@ -205,6 +206,23 @@ public class CollectionDataSets {
 
 		return env.fromCollection(data, type);
 	}
+	
+	public static DataSet<Tuple2<byte[], Integer>> getTuple2WithByteArrayDataSet(ExecutionEnvironment env) {
+		List<Tuple2<byte[], Integer>> data = new ArrayList<Tuple2<byte[], Integer>>();
+		data.add(new Tuple2<byte[], Integer>(new byte[]{0, 4}, 1));
+		data.add(new Tuple2<byte[], Integer>(new byte[]{2, 0}, 1));
+		data.add(new Tuple2<byte[], Integer>(new byte[]{2, 0, 4}, 4));
+		data.add(new Tuple2<byte[], Integer>(new byte[]{2, 1}, 3));
+		data.add(new Tuple2<byte[], Integer>(new byte[]{0}, 0));
+		data.add(new Tuple2<byte[], Integer>(new byte[]{2, 0}, 1));
+				
+		TupleTypeInfo<Tuple2<byte[], Integer>> type = new TupleTypeInfo<Tuple2<byte[], Integer>>(
+				PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
+				BasicTypeInfo.INT_TYPE_INFO
+		);
+		
+		return env.fromCollection(data, type);
+	}
 
 	public static DataSet<String> getStringDataSet(ExecutionEnvironment env) {