You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 23:37:33 UTC
[5/5] flink git commit: [FLINK-2565] Support primitive arrays as keys
[FLINK-2565] Support primitive arrays as keys
This closes #1043
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0807eec0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0807eec0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0807eec0
Branch: refs/heads/master
Commit: 0807eec0cb1acf8052a77b6133387e25399fce08
Parents: 1e38d6f
Author: zentol <s....@web.de>
Authored: Sun Aug 23 15:36:47 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 27 20:17:10 2015 +0200
----------------------------------------------------------------------
.../common/typeinfo/PrimitiveArrayTypeInfo.java | 47 +++++--
.../array/BooleanPrimitiveArrayComparator.java | 56 +++++++++
.../array/BytePrimitiveArrayComparator.java | 56 +++++++++
.../array/CharPrimitiveArrayComparator.java | 56 +++++++++
.../array/DoublePrimitiveArrayComparator.java | 57 +++++++++
.../array/FloatPrimitiveArrayComparator.java | 56 +++++++++
.../base/array/IntPrimitiveArrayComparator.java | 56 +++++++++
.../array/LongPrimitiveArrayComparator.java | 56 +++++++++
.../base/array/PrimitiveArrayComparator.java | 121 +++++++++++++++++++
.../array/ShortPrimitiveArrayComparator.java | 56 +++++++++
.../BooleanPrimitiveArrayComparatorTest.java | 45 +++++++
.../array/BytePrimitiveArrayComparatorTest.java | 44 +++++++
.../array/CharPrimitiveArrayComparatorTest.java | 42 +++++++
.../DoublePrimitiveArrayComparatorTest.java | 44 +++++++
.../FloatPrimitiveArrayComparatorTest.java | 44 +++++++
.../array/IntPrimitiveArrayComparatorTest.java | 44 +++++++
.../array/LongPrimitiveArrayComparatorTest.java | 44 +++++++
.../array/PrimitiveArrayComparatorTestBase.java | 41 +++++++
.../ShortPrimitiveArrayComparatorTest.java | 44 +++++++
.../flink/api/java/operator/GroupingTest.java | 14 ++-
.../javaApiOperators/GroupReduceITCase.java | 31 +++++
.../util/CollectionDataSets.java | 18 +++
22 files changed, 1057 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
index 83126ab..3843f28 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
@@ -24,13 +24,22 @@ import java.util.Map;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BooleanPrimitiveArrayComparator;
import org.apache.flink.api.common.typeutils.base.array.BooleanPrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.CharPrimitiveArrayComparator;
import org.apache.flink.api.common.typeutils.base.array.CharPrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.DoublePrimitiveArrayComparator;
import org.apache.flink.api.common.typeutils.base.array.DoublePrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.FloatPrimitiveArrayComparator;
import org.apache.flink.api.common.typeutils.base.array.FloatPrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.IntPrimitiveArrayComparator;
import org.apache.flink.api.common.typeutils.base.array.IntPrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArrayComparator;
import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
+import org.apache.flink.api.common.typeutils.base.array.PrimitiveArrayComparator;
+import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArrayComparator;
import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArraySerializer;
/**
@@ -39,19 +48,18 @@ import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArraySeria
*
* @param <T> The type represented by this type information, e.g., int[], double[], long[]
*/
-public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> {
-
+public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
+
private static final long serialVersionUID = 1L;
- public static final PrimitiveArrayTypeInfo<boolean[]> BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<boolean[]>(boolean[].class, BooleanPrimitiveArraySerializer.INSTANCE);
- public static final PrimitiveArrayTypeInfo<byte[]> BYTE_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<byte[]>(byte[].class, BytePrimitiveArraySerializer.INSTANCE);
- public static final PrimitiveArrayTypeInfo<short[]> SHORT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<short[]>(short[].class, ShortPrimitiveArraySerializer.INSTANCE);
- public static final PrimitiveArrayTypeInfo<int[]> INT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<int[]>(int[].class, IntPrimitiveArraySerializer.INSTANCE);
- public static final PrimitiveArrayTypeInfo<long[]> LONG_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<long[]>(long[].class, LongPrimitiveArraySerializer.INSTANCE);
- public static final PrimitiveArrayTypeInfo<float[]> FLOAT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<float[]>(float[].class, FloatPrimitiveArraySerializer.INSTANCE);
- public static final PrimitiveArrayTypeInfo<double[]> DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<double[]>(double[].class, DoublePrimitiveArraySerializer.INSTANCE);
- public static final PrimitiveArrayTypeInfo<char[]> CHAR_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<char[]>(char[].class, CharPrimitiveArraySerializer.INSTANCE);
-
+ public static final PrimitiveArrayTypeInfo<boolean[]> BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<boolean[]>(boolean[].class, BooleanPrimitiveArraySerializer.INSTANCE, BooleanPrimitiveArrayComparator.class);
+ public static final PrimitiveArrayTypeInfo<byte[]> BYTE_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<byte[]>(byte[].class, BytePrimitiveArraySerializer.INSTANCE, BytePrimitiveArrayComparator.class);
+ public static final PrimitiveArrayTypeInfo<short[]> SHORT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<short[]>(short[].class, ShortPrimitiveArraySerializer.INSTANCE, ShortPrimitiveArrayComparator.class);
+ public static final PrimitiveArrayTypeInfo<int[]> INT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<int[]>(int[].class, IntPrimitiveArraySerializer.INSTANCE, IntPrimitiveArrayComparator.class);
+ public static final PrimitiveArrayTypeInfo<long[]> LONG_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<long[]>(long[].class, LongPrimitiveArraySerializer.INSTANCE, LongPrimitiveArrayComparator.class);
+ public static final PrimitiveArrayTypeInfo<float[]> FLOAT_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<float[]>(float[].class, FloatPrimitiveArraySerializer.INSTANCE, FloatPrimitiveArrayComparator.class);
+ public static final PrimitiveArrayTypeInfo<double[]> DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<double[]>(double[].class, DoublePrimitiveArraySerializer.INSTANCE, DoublePrimitiveArrayComparator.class);
+ public static final PrimitiveArrayTypeInfo<char[]> CHAR_PRIMITIVE_ARRAY_TYPE_INFO = new PrimitiveArrayTypeInfo<char[]>(char[].class, CharPrimitiveArraySerializer.INSTANCE, CharPrimitiveArrayComparator.class);
// --------------------------------------------------------------------------------------------
/** The class of the array (such as int[].class) */
@@ -60,12 +68,15 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> {
/** The serializer for the array */
private final TypeSerializer<T> serializer;
+ /** The class of the comparator for the array */
+ private Class<? extends PrimitiveArrayComparator<T, ?>> comparatorClass;
+
/**
* Creates a new type info for a
* @param arrayClass The class of the array (such as int[].class)
* @param serializer The serializer for the array.
*/
- private PrimitiveArrayTypeInfo(Class<T> arrayClass, TypeSerializer<T> serializer) {
+ private PrimitiveArrayTypeInfo(Class<T> arrayClass, TypeSerializer<T> serializer, Class<? extends PrimitiveArrayComparator<T, ?>> comparatorClass) {
if (arrayClass == null || serializer == null) {
throw new NullPointerException();
}
@@ -74,6 +85,7 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> {
}
this.arrayClass = arrayClass;
this.serializer = serializer;
+ this.comparatorClass = comparatorClass;
}
// --------------------------------------------------------------------------------------------
@@ -105,7 +117,7 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> {
@Override
public boolean isKeyType() {
- return false;
+ return true;
}
@Override
@@ -161,4 +173,13 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> {
TYPES.put(double[].class, DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO);
TYPES.put(char[].class, CHAR_PRIMITIVE_ARRAY_TYPE_INFO);
}
+
+ @Override
+ public PrimitiveArrayComparator<T, ?> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
+ try {
+ return comparatorClass.getConstructor(boolean.class).newInstance(sortOrderAscending);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not initialize primitive " + comparatorClass.getName() + " array comparator.", e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparator.java
new file mode 100644
index 0000000..b7487b8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.BooleanComparator;
+
+public class BooleanPrimitiveArrayComparator extends PrimitiveArrayComparator<boolean[], BooleanComparator> {
+ public BooleanPrimitiveArrayComparator(boolean ascending) {
+ super(ascending, new BooleanComparator(ascending));
+ }
+
+ @Override
+ public int hash(boolean[] record) {
+ int result = 0;
+ for (boolean field : record) {
+ result += field ? 1231 : 1237;
+ }
+ return result;
+ }
+
+ @Override
+ public int compare(boolean[] first, boolean[] second) {
+ for (int x = 0; x < min(first.length, second.length); x++) {
+ int cmp = (second[x] == first[x] ? 0 : (first[x] ? 1 : -1));
+ if (cmp != 0) {
+ return ascending ? cmp : -cmp;
+ }
+ }
+ int cmp = first.length - second.length;
+ return ascending ? cmp : -cmp;
+ }
+
+ @Override
+ public TypeComparator<boolean[]> duplicate() {
+ BooleanPrimitiveArrayComparator dupe = new BooleanPrimitiveArrayComparator(this.ascending);
+ dupe.setReference(this.reference);
+ return dupe;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparator.java
new file mode 100644
index 0000000..d914c3e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.ByteComparator;
+
+public class BytePrimitiveArrayComparator extends PrimitiveArrayComparator<byte[], ByteComparator> {
+ public BytePrimitiveArrayComparator(boolean ascending) {
+ super(ascending, new ByteComparator(ascending));
+ }
+
+ @Override
+ public int hash(byte[] record) {
+ int result = 0;
+ for (byte field : record) {
+ result += (int) field;
+ }
+ return result;
+ }
+
+ @Override
+ public int compare(byte[] first, byte[] second) {
+ for (int x = 0; x < min(first.length, second.length); x++) {
+ int cmp = first[x] - second[x];
+ if (cmp != 0) {
+ return ascending ? cmp : -cmp;
+ }
+ }
+ int cmp = first.length - second.length;
+ return ascending ? cmp : -cmp;
+ }
+
+ @Override
+ public TypeComparator<byte[]> duplicate() {
+ BytePrimitiveArrayComparator dupe = new BytePrimitiveArrayComparator(this.ascending);
+ dupe.setReference(this.reference);
+ return dupe;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparator.java
new file mode 100644
index 0000000..d734152
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.CharComparator;
+
+public class CharPrimitiveArrayComparator extends PrimitiveArrayComparator<char[], CharComparator> {
+ public CharPrimitiveArrayComparator(boolean ascending) {
+ super(ascending, new CharComparator(ascending));
+ }
+
+ @Override
+ public int hash(char[] record) {
+ int result = 0;
+ for (char field : record) {
+ result += (int) field;
+ }
+ return result;
+ }
+
+ @Override
+ public int compare(char[] first, char[] second) {
+ for (int x = 0; x < min(first.length, second.length); x++) {
+ int cmp = first[x] - second[x];
+ if (cmp != 0) {
+ return ascending ? cmp : -cmp;
+ }
+ }
+ int cmp = first.length - second.length;
+ return ascending ? cmp : -cmp;
+ }
+
+ @Override
+ public TypeComparator<char[]> duplicate() {
+ CharPrimitiveArrayComparator dupe = new CharPrimitiveArrayComparator(this.ascending);
+ dupe.setReference(this.reference);
+ return dupe;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparator.java
new file mode 100644
index 0000000..5153fa5
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.DoubleComparator;
+
+public class DoublePrimitiveArrayComparator extends PrimitiveArrayComparator<double[], DoubleComparator> {
+ public DoublePrimitiveArrayComparator(boolean ascending) {
+ super(ascending, new DoubleComparator(ascending));
+ }
+
+ @Override
+ public int hash(double[] record) {
+ int result = 0;
+ for (double field : record) {
+ long bits = Double.doubleToLongBits(field);
+ result += (int) (bits ^ (bits >>> 32));
+ }
+ return result;
+ }
+
+ @Override
+ public int compare(double[] first, double[] second) {
+ for (int x = 0; x < min(first.length, second.length); x++) {
+ int cmp = Double.compare(first[x], second[x]);
+ if (cmp != 0) {
+ return ascending ? cmp : -cmp;
+ }
+ }
+ int cmp = first.length - second.length;
+ return ascending ? cmp : -cmp;
+ }
+
+ @Override
+ public TypeComparator<double[]> duplicate() {
+ DoublePrimitiveArrayComparator dupe = new DoublePrimitiveArrayComparator(this.ascending);
+ dupe.setReference(this.reference);
+ return dupe;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparator.java
new file mode 100644
index 0000000..5a5986e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.FloatComparator;
+
+public class FloatPrimitiveArrayComparator extends PrimitiveArrayComparator<float[], FloatComparator> {
+ public FloatPrimitiveArrayComparator(boolean ascending) {
+ super(ascending, new FloatComparator(ascending));
+ }
+
+ @Override
+ public int hash(float[] record) {
+ int result = 0;
+ for (float field : record) {
+ result += Float.floatToIntBits(field);
+ }
+ return result;
+ }
+
+ @Override
+ public int compare(float[] first, float[] second) {
+ for (int x = 0; x < min(first.length, second.length); x++) {
+ int cmp = Float.compare(first[x], second[x]);
+ if (cmp != 0) {
+ return ascending ? cmp : -cmp;
+ }
+ }
+ int cmp = first.length - second.length;
+ return ascending ? cmp : -cmp;
+ }
+
+ @Override
+ public TypeComparator<float[]> duplicate() {
+ FloatPrimitiveArrayComparator dupe = new FloatPrimitiveArrayComparator(this.ascending);
+ dupe.setReference(this.reference);
+ return dupe;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparator.java
new file mode 100644
index 0000000..78cb2ae
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+
+public class IntPrimitiveArrayComparator extends PrimitiveArrayComparator<int[], IntComparator> {
+ public IntPrimitiveArrayComparator(boolean ascending) {
+ super(ascending, new IntComparator(ascending));
+ }
+
+ @Override
+ public int hash(int[] record) {
+ int result = 0;
+ for (int field : record) {
+ result += field;
+ }
+ return result;
+ }
+
+ @Override
+ public int compare(int[] first, int[] second) {
+ for (int x = 0; x < min(first.length, second.length); x++) {
+ int cmp = first[x] - second[x];
+ if (cmp != 0) {
+ return ascending ? cmp : -cmp;
+ }
+ }
+ int cmp = first.length - second.length;
+ return ascending ? cmp : -cmp;
+ }
+
+ @Override
+ public TypeComparator<int[]> duplicate() {
+ IntPrimitiveArrayComparator dupe = new IntPrimitiveArrayComparator(this.ascending);
+ dupe.setReference(this.reference);
+ return dupe;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparator.java
new file mode 100644
index 0000000..c0a69bc
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+
+public class LongPrimitiveArrayComparator extends PrimitiveArrayComparator<long[], LongComparator> {
+ public LongPrimitiveArrayComparator(boolean ascending) {
+ super(ascending, new LongComparator(ascending));
+ }
+
+ @Override
+ public int hash(long[] record) {
+ int result = 0;
+ for (long field : record) {
+ result += (int) (field ^ (field >>> 32));
+ }
+ return result;
+ }
+
+ @Override
+ public int compare(long[] first, long[] second) {
+ for (int x = 0; x < min(first.length, second.length); x++) {
+ int cmp = first[x] < second[x] ? -1 : (first[x] == second[x] ? 0 : 1);
+ if (cmp != 0) {
+ return ascending ? cmp : -cmp;
+ }
+ }
+ int cmp = first.length - second.length;
+ return ascending ? cmp : -cmp;
+ }
+
+ @Override
+ public TypeComparator<long[]> duplicate() {
+ LongPrimitiveArrayComparator dupe = new LongPrimitiveArrayComparator(this.ascending);
+ dupe.setReference(this.reference);
+ return dupe;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparator.java
new file mode 100644
index 0000000..ba53aff
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparator.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import java.io.IOException;
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.BasicTypeComparator;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+public abstract class PrimitiveArrayComparator<T, C extends BasicTypeComparator> extends TypeComparator<T> {
+ // For use by getComparators
+ @SuppressWarnings("rawtypes")
+ private final TypeComparator[] comparators = new TypeComparator[]{this};
+
+ protected final boolean ascending;
+ protected transient T reference;
+ protected final C comparator;
+
+ public PrimitiveArrayComparator(boolean ascending, C comparator) {
+ this.ascending = ascending;
+ this.comparator = comparator;
+ }
+
+ @Override
+ public void setReference(T toCompare) {
+ this.reference = toCompare;
+ }
+
+ @Override
+ public boolean equalToReference(T candidate) {
+ return compare(this.reference, candidate) == 0;
+ }
+
+ @Override
+ public int compareToReference(TypeComparator<T> referencedComparator) {
+ return compare(((PrimitiveArrayComparator<T, C>) referencedComparator).reference, this.reference);
+ }
+
+ @Override
+ public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+ int firstCount = firstSource.readInt();
+ int secondCount = secondSource.readInt();
+ for (int x = 0; x < min(firstCount, secondCount); x++) {
+ int cmp = comparator.compareSerialized(firstSource, secondSource);
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+ int cmp = firstCount - secondCount;
+ return ascending ? cmp : -cmp;
+ }
+
+ @Override
+ public int extractKeys(Object record, Object[] target, int index) {
+ target[index] = record;
+ return 1;
+ }
+
+ @Override
+ public TypeComparator[] getFlatComparators() {
+ return comparators;
+ }
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsSerializationWithKeyNormalization() {
+ return false;
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ return 0;
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean invertNormalizedKey() {
+ return !ascending;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparator.java
new file mode 100644
index 0000000..5943694
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import static java.lang.Math.min;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.base.ShortComparator;
+
+public class ShortPrimitiveArrayComparator extends PrimitiveArrayComparator<short[], ShortComparator> {
+ public ShortPrimitiveArrayComparator(boolean ascending) {
+ super(ascending, new ShortComparator(ascending));
+ }
+
+ @Override
+ public int hash(short[] record) {
+ int result = 0;
+ for (short field : record) {
+ result += (int) field;
+ }
+ return result;
+ }
+
+ @Override
+ public int compare(short[] first, short[] second) {
+ for (int x = 0; x < min(first.length, second.length); x++) {
+ int cmp = first[x] - second[x];
+ if (cmp != 0) {
+ return ascending ? cmp : -cmp;
+ }
+ }
+ int cmp = first.length - second.length;
+ return ascending ? cmp : -cmp;
+ }
+
+ @Override
+ public TypeComparator<short[]> duplicate() {
+ ShortPrimitiveArrayComparator dupe = new ShortPrimitiveArrayComparator(this.ascending);
+ dupe.setReference(this.reference);
+ return dupe;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparatorTest.java
new file mode 100644
index 0000000..4db71bf
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArrayComparatorTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.junit.Assert;
+
+public class BooleanPrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase<boolean[]> {
+ public BooleanPrimitiveArrayComparatorTest() {
+ super(PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO);
+ }
+
+ @Override
+ protected void deepEquals(String message, boolean[] should, boolean[] is) {
+ Assert.assertTrue(should.length == is.length);
+ for(int x=0; x< should.length; x++) {
+ Assert.assertEquals(should[x], is[x]);
+ }
+ }
+
+ @Override
+ protected boolean[][] getSortedTestData() {
+ return new boolean[][]{
+ new boolean[]{false, false},
+ new boolean[]{false, true},
+ new boolean[]{false, true, true},
+ new boolean[]{true},
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparatorTest.java
new file mode 100644
index 0000000..4c57702
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArrayComparatorTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.junit.Assert;
+
+public class BytePrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase<byte[]> {
+ public BytePrimitiveArrayComparatorTest() {
+ super(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
+ }
+
+ @Override
+ protected void deepEquals(String message, byte[] should, byte[] is) {
+ Assert.assertArrayEquals(message, should, is);
+ }
+
+ @Override
+ protected byte[][] getSortedTestData() {
+ return new byte[][]{
+ new byte[]{-1, 0},
+ new byte[]{0, -1},
+ new byte[]{0, 0},
+ new byte[]{0, 1},
+ new byte[]{0, 1, 2},
+ new byte[]{2}
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparatorTest.java
new file mode 100644
index 0000000..b318168
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArrayComparatorTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.junit.Assert;
+
+public class CharPrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase<char[]> {
+ public CharPrimitiveArrayComparatorTest() {
+ super(PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO);
+ }
+
+ @Override
+ protected void deepEquals(String message, char[] should, char[] is) {
+ Assert.assertArrayEquals(message, should, is);
+ }
+
+ @Override
+ protected char[][] getSortedTestData() {
+ return new char[][]{
+ new char[]{0, 0},
+ new char[]{0, 1},
+ new char[]{0, 1, 2},
+ new char[]{2}
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparatorTest.java
new file mode 100644
index 0000000..b5d7e1d
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArrayComparatorTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.junit.Assert;
+
+public class DoublePrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase<double[]> {
+ public DoublePrimitiveArrayComparatorTest() {
+ super(PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO);
+ }
+
+ @Override
+ protected void deepEquals(String message, double[] should, double[] is) {
+ Assert.assertArrayEquals(message, should, is, 0.00001);
+ }
+
+ @Override
+ protected double[][] getSortedTestData() {
+ return new double[][]{
+ new double[]{-1, 0},
+ new double[]{0, -1},
+ new double[]{0, 0},
+ new double[]{0, 1},
+ new double[]{0, 1, 2},
+ new double[]{2}
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparatorTest.java
new file mode 100644
index 0000000..830049e
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArrayComparatorTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.junit.Assert;
+
+public class FloatPrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase<float[]> {
+ public FloatPrimitiveArrayComparatorTest() {
+ super(PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO);
+ }
+
+ @Override
+ protected void deepEquals(String message, float[] should, float[] is) {
+ Assert.assertArrayEquals(message, should, is, (float) 0.00001);
+ }
+
+ @Override
+ protected float[][] getSortedTestData() {
+ return new float[][]{
+ new float[]{-1, 0},
+ new float[]{0, -1},
+ new float[]{0, 0},
+ new float[]{0, 1},
+ new float[]{0, 1, 2},
+ new float[]{2}
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparatorTest.java
new file mode 100644
index 0000000..6c05f23
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArrayComparatorTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.junit.Assert;
+
+public class IntPrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase<int[]> {
+ public IntPrimitiveArrayComparatorTest() {
+ super(PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
+ }
+
+ @Override
+ protected void deepEquals(String message, int[] should, int[] is) {
+ Assert.assertArrayEquals(message, should, is);
+ }
+
+ @Override
+ protected int[][] getSortedTestData() {
+ return new int[][]{
+ new int[]{-1, 0},
+ new int[]{0, -1},
+ new int[]{0, 0},
+ new int[]{0, 1},
+ new int[]{0, 1, 2},
+ new int[]{2}
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparatorTest.java
new file mode 100644
index 0000000..0ae573e
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArrayComparatorTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.junit.Assert;
+
+public class LongPrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase<long[]> {
+ public LongPrimitiveArrayComparatorTest() {
+ super(PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO);
+ }
+
+ @Override
+ protected void deepEquals(String message, long[] should, long[] is) {
+ Assert.assertArrayEquals(message, should, is);
+ }
+
+ @Override
+ protected long[][] getSortedTestData() {
+ return new long[][]{
+ new long[]{-1, 0},
+ new long[]{0, -1},
+ new long[]{0, 0},
+ new long[]{0, 1},
+ new long[]{0, 1, 2},
+ new long[]{2}
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparatorTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparatorTestBase.java
new file mode 100644
index 0000000..ff620dd
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/PrimitiveArrayComparatorTestBase.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public abstract class PrimitiveArrayComparatorTestBase<T> extends ComparatorTestBase<T> {
+ private PrimitiveArrayTypeInfo<T> info;
+
+ public PrimitiveArrayComparatorTestBase(PrimitiveArrayTypeInfo<T> info) {
+ this.info = info;
+ }
+
+ @Override
+ protected TypeComparator<T> createComparator(boolean ascending) {
+ return info.createComparator(ascending, null).duplicate();
+ }
+
+ @Override
+ protected TypeSerializer<T> createSerializer() {
+ return info.createSerializer(null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparatorTest.java
new file mode 100644
index 0000000..5b48dc2
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArrayComparatorTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.typeutils.base.array;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.junit.Assert;
+
+public class ShortPrimitiveArrayComparatorTest extends PrimitiveArrayComparatorTestBase<short[]> {
+ public ShortPrimitiveArrayComparatorTest() {
+ super(PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO);
+ }
+
+ @Override
+ protected void deepEquals(String message, short[] should, short[] is) {
+ Assert.assertArrayEquals(message, should, is);
+ }
+
+ @Override
+ protected short[][] getSortedTestData() {
+ return new short[][]{
+ new short[]{-1, 0},
+ new short[]{0, -1},
+ new short[]{0, 0},
+ new short[]{0, 1},
+ new short[]{0, 1, 2},
+ new short[]{2}
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
index b3922b3..bdad3db 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
@@ -67,8 +67,9 @@ public class GroupingTest {
private final List<Tuple4<Integer, Long, CustomType, Long[]>> tupleWithCustomData =
new ArrayList<Tuple4<Integer, Long, CustomType, Long[]>>();
-
+ private final List<Tuple2<byte[], byte[]>> byteArrayData = new ArrayList<Tuple2<byte[], byte[]>>();
+
@Test
public void testGroupByKeyFields1() {
@@ -127,6 +128,15 @@ public class GroupingTest {
}
@Test
+ public void testGroupByKeyFieldsOnPrimitiveArray() {
+ this.byteArrayData.add(new Tuple2(new byte[]{0}, new byte[]{1}));
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple2<byte[], byte[]>> tupleDs = env.fromCollection(byteArrayData);
+ tupleDs.groupBy(0);
+ }
+
+ @Test
public void testGroupByKeyExpressions1() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -613,7 +623,7 @@ public class GroupingTest {
public static class CustomType2 implements Serializable {
public int myInt;
- public int[] myIntArray;
+ public Integer[] myIntArray;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index 260de1c..95a8cb0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -62,6 +62,37 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
}
@Test
+ public void testCorrectnessofGroupReduceOnTupleContainingPrimitiveByteArrayWithKeyFieldSelectors() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<byte[], Integer>> ds = CollectionDataSets.getTuple2WithByteArrayDataSet(env);
+ DataSet<Integer> reduceDs = ds.
+ groupBy(0).reduceGroup(new ByteArrayGroupReduce());
+
+ List<Integer> result = reduceDs.collect();
+
+ String expected = "0\n"
+ + "1\n"
+ + "2\n"
+ + "3\n"
+ + "4\n";
+
+ compareResultAsText(result, expected);
+
+ }
+
+ public static class ByteArrayGroupReduce implements GroupReduceFunction<Tuple2<byte[], Integer>, Integer> {
+ @Override
+ public void reduce(Iterable<Tuple2<byte[], Integer>> values, Collector<Integer> out) throws Exception {
+ int sum = 0;
+ for (Tuple2<byte[], Integer> value : values) {
+ sum += value.f1;
+ }
+ out.collect(sum);
+ }
+ }
+
+ @Test
public void testCorrectnessOfGroupReduceOnTuplesWithKeyFieldSelector() throws Exception{
/*
* check correctness of groupReduce on tuples with key field selector
http://git-wip-us.apache.org/repos/asf/flink/blob/0807eec0/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
index 1faf4c1..9fb275f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
@@ -205,6 +206,23 @@ public class CollectionDataSets {
return env.fromCollection(data, type);
}
+
+ public static DataSet<Tuple2<byte[], Integer>> getTuple2WithByteArrayDataSet(ExecutionEnvironment env) {
+ List<Tuple2<byte[], Integer>> data = new ArrayList<Tuple2<byte[], Integer>>();
+ data.add(new Tuple2<byte[], Integer>(new byte[]{0, 4}, 1));
+ data.add(new Tuple2<byte[], Integer>(new byte[]{2, 0}, 1));
+ data.add(new Tuple2<byte[], Integer>(new byte[]{2, 0, 4}, 4));
+ data.add(new Tuple2<byte[], Integer>(new byte[]{2, 1}, 3));
+ data.add(new Tuple2<byte[], Integer>(new byte[]{0}, 0));
+ data.add(new Tuple2<byte[], Integer>(new byte[]{2, 0}, 1));
+
+ TupleTypeInfo<Tuple2<byte[], Integer>> type = new TupleTypeInfo<Tuple2<byte[], Integer>>(
+ PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO
+ );
+
+ return env.fromCollection(data, type);
+ }
public static DataSet<String> getStringDataSet(ExecutionEnvironment env) {