You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/03/10 17:32:58 UTC
flink git commit: [FLINK-1567] Add switch to use the AvroSerializer
for GenericTypeInfo
Repository: flink
Updated Branches:
refs/heads/release-0.8 88c7ea256 -> 9f18cbb3a
[FLINK-1567] Add switch to use the AvroSerializer for GenericTypeInfo
This closes #413
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f18cbb3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f18cbb3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f18cbb3
Branch: refs/heads/release-0.8
Commit: 9f18cbb3a680e72a21197406059a420857dee5f8
Parents: 88c7ea2
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Feb 17 12:05:05 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Tue Mar 10 14:02:45 2015 +0100
----------------------------------------------------------------------
.../apache/flink/api/io/avro/AvroPojoTest.java | 68 +++++++++++++++++++-
.../api/java/typeutils/GenericAvroTypeInfo.java | 40 ++++++++++++
.../api/java/typeutils/GenericTypeInfo.java | 11 +++-
.../flink/api/java/typeutils/PojoTypeInfo.java | 2 +-
.../java/typeutils/runtime/AvroSerializer.java | 25 ++++++-
5 files changed, 140 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9f18cbb3/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
index b91a3d8..b3321c8 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
@@ -22,8 +22,12 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.io.avro.generated.User;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.io.AvroInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.GenericAvroTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.test.util.JavaProgramTestBase;
@@ -51,7 +55,6 @@ public class AvroPojoTest extends JavaProgramTestBase {
}
private File inFile;
- private String expected;
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
@@ -97,7 +100,7 @@ public class AvroPojoTest extends JavaProgramTestBase {
return "";
}
- private static int NUM_PROGRAMS = 3;
+ private static int NUM_PROGRAMS = 5;
private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
@@ -146,6 +149,7 @@ public class AvroPojoTest extends JavaProgramTestBase {
in = new Path(inFile.getAbsoluteFile().toURI());
AvroInputFormat<User> users1 = new AvroInputFormat<User>(in, User.class);
+ Assert.assertTrue(users1.getProducedType() instanceof PojoTypeInfo);
DataSet<User> usersDS1 = env.createInput(users1)
// null map type because the order changes in different JVMs (hard to test)
.map(new MapFunction<User, User>() {
@@ -163,7 +167,65 @@ public class AvroPojoTest extends JavaProgramTestBase {
return "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null}\n" +
"{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null}\n";
+ case 4:
+ /**
+ * Test GenericTypeInfo with Avro serialization.
+ */
+ env = ExecutionEnvironment.getExecutionEnvironment();
+ GenericTypeInfo.USE_AVRO_SERIALIZER = true;
+ in = new Path(inFile.getAbsoluteFile().toURI());
+
+ AvroInputFormat<User> users2 = new AvroInputFormat<User>(in, User.class);
+ DataSet<User> usersDS2 = env.createInput(users2, new GenericTypeInfo<User>(User.class));
+
+ DataSet<Tuple2<String, Integer>> res2 = usersDS2.groupBy(new KeySelector<User, String>() {
+ @Override
+ public String getKey(User value) throws Exception {
+ return String.valueOf(value.getName());
+ }
+ }).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() {
+ @Override
+ public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (User u : values) {
+ out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1));
+ }
+ }
+ });
+ res2.writeAsText(resultPath);
+ env.execute("Avro Key selection");
+
+
+ return "(Alyssa,1)\n(Charlie,1)\n";
+ case 5:
+ /**
+ * Test GenericAvroTypeInfo with Avro serialization.
+ */
+ env = ExecutionEnvironment.getExecutionEnvironment();
+ in = new Path(inFile.getAbsoluteFile().toURI());
+
+ AvroInputFormat<User> users3 = new AvroInputFormat<User>(in, User.class);
+ DataSet<User> usersDS3 = env.createInput(users3, new GenericAvroTypeInfo<User>(User.class));
+
+ DataSet<Tuple2<String, Integer>> res3 = usersDS3.groupBy(new KeySelector<User, String>() {
+ @Override
+ public String getKey(User value) throws Exception {
+ return String.valueOf(value.getName());
+ }
+ }).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() {
+ @Override
+ public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (User u : values) {
+ out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1));
+ }
+ }
+ });
+
+ res3.writeAsText(resultPath);
+ env.execute("Avro Key selection");
+
+
+ return "(Alyssa,1)\n(Charlie,1)\n";
default:
throw new RuntimeException("Unknown test");
}
@@ -176,7 +238,7 @@ public class AvroPojoTest extends JavaProgramTestBase {
}
@Parameterized.Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+ public static Collection<Object[]> getConfigurations() throws IOException {
LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
http://git-wip-us.apache.org/repos/asf/flink/blob/9f18cbb3/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericAvroTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericAvroTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericAvroTypeInfo.java
new file mode 100644
index 0000000..6e3eebd
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericAvroTypeInfo.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
+
+
+/**
+ * TypeInformation like the regular GenericTypeInfo (kryo) but enforcing
+ * the AvroSerializer.
+ */
+public class GenericAvroTypeInfo<T> extends GenericTypeInfo<T> {
+
+ public GenericAvroTypeInfo(Class<T> typeClass) {
+ super(typeClass);
+ }
+
+ @Override
+ public TypeSerializer<T> createSerializer() {
+ return new AvroSerializer<T>(getTypeClass());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f18cbb3/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index 5bc6cb9..30688b9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.AtomicType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
import org.apache.flink.api.java.typeutils.runtime.KryoSerializer;
@@ -29,9 +30,13 @@ import org.apache.flink.api.java.typeutils.runtime.KryoSerializer;
public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
private final Class<T> typeClass;
+ public static boolean USE_AVRO_SERIALIZER = false;
+
+ private boolean useAvroSerializer;
public GenericTypeInfo(Class<T> typeClass) {
this.typeClass = typeClass;
+ this.useAvroSerializer = USE_AVRO_SERIALIZER;
}
@Override
@@ -66,7 +71,11 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType
@Override
public TypeSerializer<T> createSerializer() {
- return new KryoSerializer<T>(this.typeClass);
+ if(useAvroSerializer) {
+ return new AvroSerializer<T>(this.typeClass);
+ } else {
+ return new KryoSerializer<T>(this.typeClass);
+ }
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/flink/blob/9f18cbb3/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 0103a7b..30a8df2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -34,9 +34,9 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
-import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
import com.google.common.base.Joiner;
+import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/9f18cbb3/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
index 2758bd6..d5573cb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
@@ -18,8 +18,15 @@
package org.apache.flink.api.java.typeutils.runtime;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -91,7 +98,22 @@ public final class AvroSerializer<T> extends TypeSerializer<T> {
@Override
public T copy(T from) {
checkKryoInitialized();
- return this.kryo.copy(from);
+ try {
+ return this.kryo.copy(from);
+ } catch(KryoException ke) {
+ // kryo was unable to copy it, so we do it through serialization:
+ ByteArrayOutputStream baout = new ByteArrayOutputStream();
+ Output output = new Output(baout);
+
+ kryo.writeObject(output, from);
+
+ output.close();
+
+ ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray());
+ Input input = new Input(bain);
+
+ return (T)kryo.readObject(input, from.getClass());
+ }
}
@Override
@@ -154,6 +176,7 @@ public final class AvroSerializer<T> extends TypeSerializer<T> {
private void checkKryoInitialized() {
if (this.kryo == null) {
this.kryo = new Kryo();
+ this.kryo.register(GenericData.Array.class, new KryoSerializer.SpecificInstanceCollectionSerializer(ArrayList.class));
this.kryo.setAsmEnabled(true);
this.kryo.register(type);
}