You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/11/20 15:30:20 UTC
[3/3] incubator-flink git commit: [FLINK-1252] Add support for
serializing Date and Enums in the Java API
[FLINK-1252] Add support for serializing Date and Enums in the Java API
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/591f16dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/591f16dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/591f16dd
Branch: refs/heads/master
Commit: 591f16dd8f80cc2a5b2fdc6654c3c2d625119faa
Parents: 4203bf9
Author: Robert Metzger <me...@web.de>
Authored: Wed Nov 19 00:41:54 2014 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Nov 20 15:29:44 2014 +0100
----------------------------------------------------------------------
.../api/common/typeinfo/BasicTypeInfo.java | 6 +-
.../common/typeutils/base/DateComparator.java | 89 +++++++++++++++
.../common/typeutils/base/DateSerializer.java | 85 ++++++++++++++
.../common/typeutils/base/EnumComparator.java | 88 +++++++++++++++
.../common/typeutils/base/EnumSerializer.java | 112 +++++++++++++++++++
.../typeutils/base/DateComparatorTest.java | 60 ++++++++++
.../typeutils/base/DateSerializerTest.java | 53 +++++++++
.../flink/api/java/typeutils/EnumTypeInfo.java | 101 +++++++++++++++++
.../flink/api/java/typeutils/TypeExtractor.java | 4 +-
.../type/extractor/PojoTypeExtractionTest.java | 2 +-
.../test/javaApiOperators/ReduceITCase.java | 66 ++++++++++-
.../util/CollectionDataSets.java | 29 ++++-
12 files changed, 689 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/591f16dd/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
index a152b4a..f27da07 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
@@ -19,6 +19,7 @@
package org.apache.flink.api.common.typeinfo;
import java.lang.reflect.Constructor;
+import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@@ -30,6 +31,8 @@ import org.apache.flink.api.common.typeutils.base.ByteComparator;
import org.apache.flink.api.common.typeutils.base.ByteSerializer;
import org.apache.flink.api.common.typeutils.base.CharComparator;
import org.apache.flink.api.common.typeutils.base.CharSerializer;
+import org.apache.flink.api.common.typeutils.base.DateComparator;
+import org.apache.flink.api.common.typeutils.base.DateSerializer;
import org.apache.flink.api.common.typeutils.base.DoubleComparator;
import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
import org.apache.flink.api.common.typeutils.base.FloatComparator;
@@ -58,6 +61,7 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
public static final BasicTypeInfo<Float> FLOAT_TYPE_INFO = new BasicTypeInfo<Float>(Float.class, FloatSerializer.INSTANCE, FloatComparator.class);
public static final BasicTypeInfo<Double> DOUBLE_TYPE_INFO = new BasicTypeInfo<Double>(Double.class, DoubleSerializer.INSTANCE, DoubleComparator.class);
public static final BasicTypeInfo<Character> CHAR_TYPE_INFO = new BasicTypeInfo<Character>(Character.class, CharSerializer.INSTANCE, CharComparator.class);
+ public static final BasicTypeInfo<Date> DATE_TYPE_INFO = new BasicTypeInfo<Date>(Date.class, DateSerializer.INSTANCE, DateComparator.class);
// --------------------------------------------------------------------------------------------
@@ -146,7 +150,6 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
throw new NullPointerException();
}
- @SuppressWarnings("unchecked")
BasicTypeInfo<X> info = (BasicTypeInfo<X>) TYPES.get(type);
return info;
}
@@ -181,5 +184,6 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
TYPES.put(double.class, DOUBLE_TYPE_INFO);
TYPES.put(Character.class, CHAR_TYPE_INFO);
TYPES.put(char.class, CHAR_TYPE_INFO);
+ TYPES.put(Date.class, DATE_TYPE_INFO);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/591f16dd/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateComparator.java
new file mode 100644
index 0000000..4bc37f2
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateComparator.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+import java.util.Date;
+
+
+public final class DateComparator extends BasicTypeComparator<Date> {
+
+ private static final long serialVersionUID = 1L;
+
+
+ public DateComparator(boolean ascending) {
+ super(ascending);
+ }
+
+ @Override
+ public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+ long l1 = firstSource.readLong();
+ long l2 = secondSource.readLong();
+ int comp = (l1 < l2 ? -1 : (l1 == l2 ? 0 : 1));
+ return ascendingComparison ? comp : -comp;
+ }
+
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return true;
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ return 8;
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return keyBytes < 8;
+ }
+
+ @Override
+ public void putNormalizedKey(Date lValue, MemorySegment target, int offset, int numBytes) {
+ long value = lValue.getTime() - Long.MIN_VALUE;
+
+ // see IntValue for an explanation of the logic
+ if (numBytes == 8) {
+ // default case, full normalized key
+ target.putLongBigEndian(offset, value);
+ }
+ else if (numBytes <= 0) {
+ }
+ else if (numBytes < 8) {
+ for (int i = 0; numBytes > 0; numBytes--, i++) {
+ target.put(offset + i, (byte) (value >>> ((7-i)<<3)));
+ }
+ }
+ else {
+ target.putLongBigEndian(offset, value);
+ for (int i = 8; i < numBytes; i++) {
+ target.put(offset + i, (byte) 0);
+ }
+ }
+ }
+
+ @Override
+ public DateComparator duplicate() {
+ return new DateComparator(ascendingComparison);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/591f16dd/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
new file mode 100644
index 0000000..4bd2ea8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Date;
+
+
+public final class DateSerializer extends TypeSerializerSingleton<Date> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final DateSerializer INSTANCE = new DateSerializer();
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public boolean isStateful() {
+ return false;
+ }
+
+ @Override
+ public Date createInstance() {
+ return new Date();
+ }
+
+ @Override
+ public Date copy(Date from) {
+ return new Date(from.getTime());
+ }
+
+ @Override
+ public Date copy(Date from, Date reuse) {
+ reuse.setTime(from.getTime());
+ return reuse;
+ }
+
+ @Override
+ public int getLength() {
+ return 8;
+ }
+
+ @Override
+ public void serialize(Date record, DataOutputView target) throws IOException {
+ target.writeLong(record.getTime());
+ }
+
+ @Override
+ public Date deserialize(DataInputView source) throws IOException {
+ return new Date(source.readLong());
+ }
+
+ @Override
+ public Date deserialize(Date reuse, DataInputView source) throws IOException {
+ reuse.setTime(source.readLong());
+ return reuse;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ target.writeLong(source.readLong());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/591f16dd/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumComparator.java
new file mode 100644
index 0000000..af6d30d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumComparator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+
+
+public final class EnumComparator<T extends Enum<T>> extends BasicTypeComparator<T> {
+
+ private static final long serialVersionUID = 1L;
+
+
+ public EnumComparator(boolean ascending) {
+ super(ascending);
+ }
+
+ @Override
+ public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
+ int i1 = firstSource.readInt();
+ int i2 = secondSource.readInt();
+ int comp = (i1 < i2 ? -1 : (i1 == i2 ? 0 : 1));
+ return ascendingComparison ? comp : -comp;
+ }
+
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return true;
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ return 4;
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return keyBytes < 4;
+ }
+
+ @Override
+ public void putNormalizedKey(T iValue, MemorySegment target, int offset, int numBytes) {
+ int value = iValue.ordinal() - Integer.MIN_VALUE;
+
+ // see IntValue for an explanation of the logic
+ if (numBytes == 4) {
+ // default case, full normalized key
+ target.putIntBigEndian(offset, value);
+ }
+ else if (numBytes <= 0) {
+ }
+ else if (numBytes < 4) {
+ for (int i = 0; numBytes > 0; numBytes--, i++) {
+ target.put(offset + i, (byte) (value >>> ((3-i)<<3)));
+ }
+ }
+ else {
+ target.putLongBigEndian(offset, value);
+ for (int i = 4; i < numBytes; i++) {
+ target.put(offset + i, (byte) 0);
+ }
+ }
+ }
+
+ @Override
+ public EnumComparator duplicate() {
+ return new EnumComparator(ascendingComparison);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/591f16dd/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
new file mode 100644
index 0000000..b46e956
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.lang.reflect.Method;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+
+public final class EnumSerializer<T extends Enum<T>> extends TypeSerializer<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient T[] values;
+
+ private final Class<T> enumClass;
+
+ public EnumSerializer(Class<T> enumClass) {
+ this.enumClass = enumClass;
+ this.values = createValues(enumClass);
+ }
+
+
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public boolean isStateful() {
+ return false;
+ }
+
+ @Override
+ public T createInstance() {
+ return values[0];
+ }
+
+ @Override
+ public T copy(T from) {
+ return from;
+ }
+
+ @Override
+ public T copy(T from, T reuse) {
+ return from;
+ }
+
+ @Override
+ public int getLength() {
+ return 4;
+ }
+
+ @Override
+ public void serialize(T record, DataOutputView target) throws IOException {
+ target.writeInt(record.ordinal());
+ }
+
+ @Override
+ public T deserialize(DataInputView source) throws IOException {
+ return values[source.readInt()];
+ }
+
+ @Override
+ public T deserialize(T reuse, DataInputView source) throws IOException {
+ return values[source.readInt()];
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ target.write(source, 4);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ this.values = createValues(this.enumClass);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> T[] createValues(Class<T> enumClass) {
+ try {
+ Method valuesMethod = enumClass.getMethod("values");
+ return (T[]) valuesMethod.invoke(null);
+
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Cannot access the constants of the enum " + enumClass.getName());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/591f16dd/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DateComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DateComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DateComparatorTest.java
new file mode 100644
index 0000000..b989713
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DateComparatorTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.Date;
+import java.util.Random;
+
+public class DateComparatorTest extends ComparatorTestBase<Date> {
+
+ @Override
+ protected TypeComparator<Date> createComparator(boolean ascending) {
+ return new DateComparator(ascending);
+ }
+
+ @Override
+ protected TypeSerializer<Date> createSerializer() {
+ return new DateSerializer();
+ }
+
+ @Override
+ protected Date[] getSortedTestData() {
+ Random rnd = new Random(874597969123412338L);
+ long rndLong = rnd.nextLong();
+ if (rndLong < 0) {
+ rndLong = -rndLong;
+ }
+ if (rndLong == Long.MAX_VALUE) {
+ rndLong -= 3;
+ }
+ if (rndLong <= 2) {
+ rndLong += 3;
+ }
+ return new Date[]{
+ new Date(0L),
+ new Date(1L),
+ new Date(2L),
+ new Date(rndLong),
+ new Date(Long.MAX_VALUE)};
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/591f16dd/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DateSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DateSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DateSerializerTest.java
new file mode 100644
index 0000000..f7737b0
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DateSerializerTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.Date;
+import java.util.Random;
+
+/**
+ * A test for the {@link org.apache.flink.api.common.typeutils.base.DateSerializer}.
+ */
+public class DateSerializerTest extends SerializerTestBase<Date> {
+
+ @Override
+ protected TypeSerializer<Date> createSerializer() {
+ return new DateSerializer();
+ }
+
+ @Override
+ protected int getLength() {
+ return 8;
+ }
+
+ @Override
+ protected Class<Date> getTypeClass() {
+ return Date.class;
+ }
+
+ @Override
+ protected Date[] getTestData() {
+ Random rnd = new Random(874597969123412341L);
+ long rndLong = rnd.nextLong();
+ return new Date[] {new Date(0L), new Date(1L),new Date(Long.MAX_VALUE), new Date(rndLong),};
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/591f16dd/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
new file mode 100644
index 0000000..c19dfe3
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.EnumComparator;
+import org.apache.flink.api.common.typeutils.base.EnumSerializer;
+
+public class EnumTypeInfo<T extends Enum<T>> extends TypeInformation<T> implements AtomicType<T> {
+
+ private final Class<T> typeClass;
+
+ public EnumTypeInfo(Class<T> typeClass) {
+ if (typeClass == null) {
+ throw new NullPointerException();
+ }
+ if (!Enum.class.isAssignableFrom(typeClass) ) {
+ throw new IllegalArgumentException("EnumTypeInfo can only be used for subclasses of " + Enum.class.getName());
+ }
+ this.typeClass = typeClass;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public TypeComparator<T> createComparator(boolean sortOrderAscending) {
+ return new EnumComparator<T>(sortOrderAscending);
+ }
+
+ @Override
+ public boolean isBasicType() {
+ return false;
+ }
+
+ @Override
+ public boolean isTupleType() {
+ return false;
+ }
+
+ @Override
+ public int getArity() {
+ return 1;
+ }
+
+ @Override
+ public int getTotalFields() {
+ return 1;
+ }
+
+ @Override
+ public Class<T> getTypeClass() {
+ return this.typeClass;
+ }
+
+ @Override
+ public boolean isKeyType() {
+ return true;
+ }
+
+ @Override
+ public TypeSerializer<T> createSerializer() {
+ return new EnumSerializer<T>(typeClass);
+ }
+
+ @Override
+ public String toString() {
+ return "EnumTypeInfo<" + typeClass.getName() + ">";
+ }
+
+ @Override
+ public int hashCode() {
+ return typeClass.hashCode() ^ 0xd3a2646c;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() == EnumTypeInfo.class) {
+ return typeClass == ((EnumTypeInfo<?>) obj).typeClass;
+ } else {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/591f16dd/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index cf3751c..55faff1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -877,7 +877,6 @@ public class TypeExtractor {
}
if (clazz.equals(Object.class)) {
- // TODO (merging): better throw an exception here. the runtime does not support it yet
return new GenericTypeInfo<X>(clazz);
}
@@ -924,6 +923,9 @@ public class TypeExtractor {
throw new InvalidTypesException("Type information extraction for tuples cannot be done based on the class.");
}
+ if(Enum.class.isAssignableFrom(clazz)) {
+ return new EnumTypeInfo(clazz);
+ }
if (alreadySeen.contains(clazz)) {
return new GenericTypeInfo<X>(clazz);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/591f16dd/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
index da74d4b..e5ac1ca 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
@@ -288,7 +288,7 @@ public class PojoTypeExtractionTest {
Assert.fail("already seen");
}
dateSeen = true;
- Assert.assertEquals(new GenericTypeInfo<Date>(Date.class), field.type);
+ Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, field.type);
Assert.assertEquals(Date.class, field.type.getTypeClass());
} else if(name.equals("someNumber")) {
if(intSeen) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/591f16dd/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
index a1957f9..38a7a0e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
@@ -21,8 +21,11 @@ package org.apache.flink.test.javaApiOperators;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
+import java.util.Date;
import java.util.LinkedList;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -34,7 +37,10 @@ import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.PojoWithDateAndEnum;
import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@@ -42,7 +48,7 @@ import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class ReduceITCase extends JavaProgramTestBase {
- private static int NUM_PROGRAMS = 10;
+ private static int NUM_PROGRAMS = 11;
private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
@@ -330,7 +336,63 @@ public class ReduceITCase extends JavaProgramTestBase {
"5,11,10,GHI,1\n" +
"5,29,0,P-),2\n" +
"5,25,0,P-),3\n";
- }
+ }
+ case 11: {
+ /**
+ * Test support for Date and enum serialization
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<PojoWithDateAndEnum> ds = env.generateSequence(0,2).map(new MapFunction<Long, PojoWithDateAndEnum>() {
+ @Override
+ public PojoWithDateAndEnum map(Long value) throws Exception {
+ int l = value.intValue();
+ switch (l) {
+ case 0:
+ PojoWithDateAndEnum one = new PojoWithDateAndEnum();
+ one.group = "a";
+ one.date = new Date(666);
+ one.cat = CollectionDataSets.Category.CAT_A;
+ return one;
+ case 1:
+ PojoWithDateAndEnum two = new PojoWithDateAndEnum();
+ two.group = "a";
+ two.date = new Date(666);
+ two.cat = CollectionDataSets.Category.CAT_A;
+ return two;
+ case 2:
+ PojoWithDateAndEnum three = new PojoWithDateAndEnum();
+ three.group = "b";
+ three.date = new Date(666);
+ three.cat = CollectionDataSets.Category.CAT_B;
+ return three;
+ }
+ throw new RuntimeException("Unexpected value for l=" + l);
+ }
+ });
+ DataSet<String> res = ds.groupBy("group").reduceGroup(new GroupReduceFunction<CollectionDataSets.PojoWithDateAndEnum, String>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void reduce(Iterable<PojoWithDateAndEnum> values,
+ Collector<String> out) throws Exception {
+ for(PojoWithDateAndEnum val : values) {
+ if(val.cat == CollectionDataSets.Category.CAT_A) {
+ Assert.assertEquals("a", val.group);
+ } else if(val.cat == CollectionDataSets.Category.CAT_B) {
+ Assert.assertEquals("b", val.group);
+ } else {
+ Assert.fail("error");
+ }
+ Assert.assertEquals(666, val.date.getTime());
+ }
+ out.collect("ok");
+ }
+ });
+
+ res.writeAsText(resultPath);
+ env.execute();
+ return "ok\nok";
+ }
default:
throw new IllegalArgumentException("Invalid program id");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/591f16dd/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
index 0f8097a..d6a8f40 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.java.tuple.Tuple8;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -532,5 +531,33 @@ public class CollectionDataSets {
return env.fromCollection(data);
}
+ public enum Category {
+ CAT_A, CAT_B;
+ }
+
+ public static class PojoWithDateAndEnum {
+ public String group;
+ public Date date;
+ public Category cat;
+ }
+
+ public static DataSet<PojoWithDateAndEnum> getPojoWithDateAndEnum(ExecutionEnvironment env) {
+ List<PojoWithDateAndEnum> data = new ArrayList<PojoWithDateAndEnum>();
+
+ PojoWithDateAndEnum one = new PojoWithDateAndEnum();
+ one.group = "a"; one.date = new Date(666); one.cat = Category.CAT_A;
+ data.add(one);
+
+ PojoWithDateAndEnum two = new PojoWithDateAndEnum();
+ two.group = "a"; two.date = new Date(666); //two.cat = Category.CAT_A;
+ data.add(two);
+
+ PojoWithDateAndEnum three = new PojoWithDateAndEnum();
+ three.group = "b"; three.date = new Date(666); //three.cat = Category.CAT_B;
+ data.add(three);
+
+ return env.fromCollection(data);
+ }
+
}