You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/03/10 17:32:58 UTC

flink git commit: [FLINK-1567] Add switch to use the AvroSerializer for GenericTypeInfo

Repository: flink
Updated Branches:
  refs/heads/release-0.8 88c7ea256 -> 9f18cbb3a


[FLINK-1567] Add switch to use the AvroSerializer for GenericTypeInfo

This closes #413


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f18cbb3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f18cbb3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f18cbb3

Branch: refs/heads/release-0.8
Commit: 9f18cbb3a680e72a21197406059a420857dee5f8
Parents: 88c7ea2
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Feb 17 12:05:05 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Tue Mar 10 14:02:45 2015 +0100

----------------------------------------------------------------------
 .../apache/flink/api/io/avro/AvroPojoTest.java  | 68 +++++++++++++++++++-
 .../api/java/typeutils/GenericAvroTypeInfo.java | 40 ++++++++++++
 .../api/java/typeutils/GenericTypeInfo.java     | 11 +++-
 .../flink/api/java/typeutils/PojoTypeInfo.java  |  2 +-
 .../java/typeutils/runtime/AvroSerializer.java  | 25 ++++++-
 5 files changed, 140 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9f18cbb3/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
index b91a3d8..b3321c8 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
@@ -22,8 +22,12 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.io.avro.generated.User;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.io.AvroInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.GenericAvroTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.test.util.JavaProgramTestBase;
@@ -51,7 +55,6 @@ public class AvroPojoTest extends JavaProgramTestBase {
 	}
 
 	private File inFile;
-	private String expected;
 
 	@Rule
 	public TemporaryFolder tempFolder = new TemporaryFolder();
@@ -97,7 +100,7 @@ public class AvroPojoTest extends JavaProgramTestBase {
 		return "";
 	}
 
-	private static int NUM_PROGRAMS = 3;
+	private static int NUM_PROGRAMS = 5;
 
 	private int curProgId = config.getInteger("ProgramId", -1);
 	private String resultPath;
@@ -146,6 +149,7 @@ public class AvroPojoTest extends JavaProgramTestBase {
 				in = new Path(inFile.getAbsoluteFile().toURI());
 
 				AvroInputFormat<User> users1 = new AvroInputFormat<User>(in, User.class);
+				Assert.assertTrue(users1.getProducedType() instanceof PojoTypeInfo);
 				DataSet<User> usersDS1 = env.createInput(users1)
 						// null map type because the order changes in different JVMs (hard to test)
 						.map(new MapFunction<User, User>() {
@@ -163,7 +167,65 @@ public class AvroPojoTest extends JavaProgramTestBase {
 
 				return "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null}\n" +
 						"{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null}\n";
+			case 4:
+				/**
+				 * Test GenericTypeInfo with Avro serialization.
+				 */
+				env = ExecutionEnvironment.getExecutionEnvironment();
+				GenericTypeInfo.USE_AVRO_SERIALIZER = true;
+				in = new Path(inFile.getAbsoluteFile().toURI());
+
+				AvroInputFormat<User> users2 = new AvroInputFormat<User>(in, User.class);
+				DataSet<User> usersDS2 = env.createInput(users2, new GenericTypeInfo<User>(User.class));
+
+				DataSet<Tuple2<String, Integer>> res2 = usersDS2.groupBy(new KeySelector<User, String>() {
+					@Override
+					public String getKey(User value) throws Exception {
+						return String.valueOf(value.getName());
+					}
+				}).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() {
+					@Override
+					public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (User u : values) {
+							out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1));
+						}
+					}
+				});
 
+				res2.writeAsText(resultPath);
+				env.execute("Avro Key selection");
+
+
+				return "(Alyssa,1)\n(Charlie,1)\n";
+			case 5:
+				/**
+				 * Test GenericAvroTypeInfo with Avro serialization.
+				 */
+				env = ExecutionEnvironment.getExecutionEnvironment();
+				in = new Path(inFile.getAbsoluteFile().toURI());
+
+				AvroInputFormat<User> users3 = new AvroInputFormat<User>(in, User.class);
+				DataSet<User> usersDS3 = env.createInput(users3, new GenericAvroTypeInfo<User>(User.class));
+
+				DataSet<Tuple2<String, Integer>> res3 = usersDS3.groupBy(new KeySelector<User, String>() {
+					@Override
+					public String getKey(User value) throws Exception {
+						return String.valueOf(value.getName());
+					}
+				}).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() {
+					@Override
+					public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception {
+						for (User u : values) {
+							out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1));
+						}
+					}
+				});
+
+				res3.writeAsText(resultPath);
+				env.execute("Avro Key selection");
+
+
+				return "(Alyssa,1)\n(Charlie,1)\n";
 			default:
 				throw new RuntimeException("Unknown test");
 		}
@@ -176,7 +238,7 @@ public class AvroPojoTest extends JavaProgramTestBase {
 	}
 
 	@Parameterized.Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+	public static Collection<Object[]> getConfigurations() throws IOException {
 
 		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9f18cbb3/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericAvroTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericAvroTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericAvroTypeInfo.java
new file mode 100644
index 0000000..6e3eebd
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericAvroTypeInfo.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
+
+
+/**
+ * TypeInformation like the regular GenericTypeInfo (kryo) but enforcing
+ * the AvroSerializer.
+ */
+public class GenericAvroTypeInfo<T> extends GenericTypeInfo<T> {
+
+	public GenericAvroTypeInfo(Class<T> typeClass) {
+		super(typeClass);
+	}
+
+	@Override
+	public TypeSerializer<T> createSerializer() {
+		return new AvroSerializer<T>(getTypeClass());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f18cbb3/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index 5bc6cb9..30688b9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
 import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
 import org.apache.flink.api.java.typeutils.runtime.KryoSerializer;
 
@@ -29,9 +30,13 @@ import org.apache.flink.api.java.typeutils.runtime.KryoSerializer;
 public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
 
 	private final Class<T> typeClass;
+	public static boolean USE_AVRO_SERIALIZER = false;
+
+	private boolean useAvroSerializer;
 
 	public GenericTypeInfo(Class<T> typeClass) {
 		this.typeClass = typeClass;
+		this.useAvroSerializer = USE_AVRO_SERIALIZER;
 	}
 	
 	@Override
@@ -66,7 +71,11 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType
 
 	@Override
 	public TypeSerializer<T> createSerializer() {
-		return new KryoSerializer<T>(this.typeClass);
+		if(useAvroSerializer) {
+			return new AvroSerializer<T>(this.typeClass);
+		} else {
+			return new KryoSerializer<T>(this.typeClass);
+		}
 	}
 
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/9f18cbb3/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 0103a7b..30a8df2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -34,9 +34,9 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
-import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
 
 import com.google.common.base.Joiner;
+import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9f18cbb3/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
index 2758bd6..d5573cb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
@@ -18,8 +18,15 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -91,7 +98,22 @@ public final class AvroSerializer<T> extends TypeSerializer<T> {
 	@Override
 	public T copy(T from) {
 		checkKryoInitialized();
-		return this.kryo.copy(from);
+		try {
+			return this.kryo.copy(from);
+		} catch(KryoException ke) {
+			// kryo was unable to copy it, so we do it through serialization:
+			ByteArrayOutputStream baout = new ByteArrayOutputStream();
+			Output output = new Output(baout);
+
+			kryo.writeObject(output, from);
+
+			output.close();
+
+			ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray());
+			Input input = new Input(bain);
+
+			return (T)kryo.readObject(input, from.getClass());
+		}
 	}
 	
 	@Override
@@ -154,6 +176,7 @@ public final class AvroSerializer<T> extends TypeSerializer<T> {
 	private void checkKryoInitialized() {
 		if (this.kryo == null) {
 			this.kryo = new Kryo();
+			this.kryo.register(GenericData.Array.class, new KryoSerializer.SpecificInstanceCollectionSerializer(ArrayList.class));
 			this.kryo.setAsmEnabled(true);
 			this.kryo.register(type);
 		}