You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/11/06 18:57:51 UTC

[1/3] flink git commit: [FLINK-6022] [avro] Use Avro to serialize Avro in flight and in State

Repository: flink
Updated Branches:
  refs/heads/master 88737cf9f -> c85f15ead


[FLINK-6022] [avro] Use Avro to serialize Avro in flight and in State

This falls back to the original serializer (Pojo / Kryo) in cases where
an old snapshot is resumed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f3a2197a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f3a2197a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f3a2197a

Branch: refs/heads/master
Commit: f3a2197a23524048200ae2b4712d6ed833208124
Parents: 88737cf
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Nov 3 14:47:33 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 6 18:56:47 2017 +0100

----------------------------------------------------------------------
 .../flink/api/java/typeutils/PojoTypeInfo.java  |   6 +-
 .../flink/api/java/typeutils/AvroTypeInfo.java  |   2 +-
 .../java/typeutils/runtime/AvroSerializer.java  | 337 ++++++++++++++++
 .../formats/avro/typeutils/AvroSerializer.java  | 382 +++++++++++--------
 .../formats/avro/typeutils/AvroTypeInfo.java    |  91 +++--
 .../BackwardsCompatibleAvroSerializer.java      | 218 +++++++++++
 .../avro/typeutils/AvroSerializerTest.java      |  59 +++
 .../avro/typeutils/AvroTypeExtractionTest.java  |  14 +-
 .../avro/typeutils/AvroTypeInfoTest.java        |  12 +
 .../BackwardsCompatibleAvroSerializerTest.java  | 167 ++++++++
 .../formats/avro/utils/TestDataGenerator.java   | 120 ++++++
 .../flink-1.3-avro-type-serialized-data         | Bin 0 -> 23926 bytes
 .../flink-1.3-avro-type-serializer-snapshot     | Bin 0 -> 48089 bytes
 13 files changed, 1208 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 2e893bb..211b7ef 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -309,6 +309,10 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 			return AvroUtils.getAvroUtils().createAvroSerializer(getTypeClass());
 		}
 
+		return createPojoSerializer(config);
+	}
+
+	public PojoSerializer<T> createPojoSerializer(ExecutionConfig config) {
 		TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length];
 		Field[] reflectiveFields = new Field[fields.length];
 
@@ -319,7 +323,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 
 		return new PojoSerializer<T>(getTypeClass(), fieldSerializers, reflectiveFields, config);
 	}
-	
+
 	@Override
 	public boolean equals(Object obj) {
 		if (obj instanceof PojoTypeInfo) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
index 58085f6..03bacfa 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
@@ -33,6 +33,6 @@ import static org.apache.flink.formats.avro.typeutils.AvroTypeInfo.generateField
 public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> {
 
 	public AvroTypeInfo(Class<T> typeClass) {
-		super(typeClass, generateFieldsFromAvroSchema(typeClass));
+		super(typeClass, generateFieldsFromAvroSchema(typeClass, true));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
new file mode 100644
index 0000000..228e672
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.formats.avro.utils.DataInputDecoder;
+import org.apache.flink.formats.avro.utils.DataOutputEncoder;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import com.esotericsoftware.kryo.Kryo;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.util.Utf8;
+
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Old deprecated Avro serializer. It is retained for a smoother experience when
+ * upgrading from an earlier Flink savepoint that stored this serializer.
+ */
+@Internal
+@Deprecated
+@SuppressWarnings({"unused", "deprecation"})
+public final class AvroSerializer<T> extends TypeSerializer<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final Class<T> type;
+
+	private final Class<? extends T> typeToInstantiate;
+
+	/**
+	 * Map of class tag (using classname as tag) to their Kryo registration.
+	 *
+	 * <p>This map serves as a preview of the final registration result of
+	 * the Kryo instance, taking into account registration overwrites.
+	 */
+	private LinkedHashMap<String, KryoRegistration> kryoRegistrations;
+
+	private transient ReflectDatumWriter<T> writer;
+	private transient ReflectDatumReader<T> reader;
+
+	private transient DataOutputEncoder encoder;
+	private transient DataInputDecoder decoder;
+
+	private transient Kryo kryo;
+
+	private transient T deepCopyInstance;
+
+	// --------------------------------------------------------------------------------------------
+
+	public AvroSerializer(Class<T> type) {
+		this(type, type);
+	}
+
+	public AvroSerializer(Class<T> type, Class<? extends T> typeToInstantiate) {
+		this.type = checkNotNull(type);
+		this.typeToInstantiate = checkNotNull(typeToInstantiate);
+
+		InstantiationUtil.checkForInstantiation(typeToInstantiate);
+
+		this.kryoRegistrations = buildKryoRegistrations(type);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public AvroSerializer<T> duplicate() {
+		return new AvroSerializer<>(type, typeToInstantiate);
+	}
+
+	@Override
+	public T createInstance() {
+		return InstantiationUtil.instantiate(this.typeToInstantiate);
+	}
+
+	@Override
+	public T copy(T from) {
+		checkKryoInitialized();
+
+		return KryoUtils.copy(from, kryo, this);
+	}
+
+	@Override
+	public T copy(T from, T reuse) {
+		checkKryoInitialized();
+
+		return KryoUtils.copy(from, reuse, kryo, this);
+	}
+
+	@Override
+	public int getLength() {
+		return -1;
+	}
+
+	@Override
+	public void serialize(T value, DataOutputView target) throws IOException {
+		checkAvroInitialized();
+		this.encoder.setOut(target);
+		this.writer.write(value, this.encoder);
+	}
+
+	@Override
+	public T deserialize(DataInputView source) throws IOException {
+		checkAvroInitialized();
+		this.decoder.setIn(source);
+		return this.reader.read(null, this.decoder);
+	}
+
+	@Override
+	public T deserialize(T reuse, DataInputView source) throws IOException {
+		checkAvroInitialized();
+		this.decoder.setIn(source);
+		return this.reader.read(reuse, this.decoder);
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		checkAvroInitialized();
+
+		if (this.deepCopyInstance == null) {
+			this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class);
+		}
+
+		this.decoder.setIn(source);
+		this.encoder.setOut(target);
+
+		T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
+		this.writer.write(tmp, this.encoder);
+	}
+
+	private void checkAvroInitialized() {
+		if (this.reader == null) {
+			this.reader = new ReflectDatumReader<>(type);
+			this.writer = new ReflectDatumWriter<>(type);
+			this.encoder = new DataOutputEncoder();
+			this.decoder = new DataInputDecoder();
+		}
+	}
+
+	private void checkKryoInitialized() {
+		if (this.kryo == null) {
+			this.kryo = new Kryo();
+
+			Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
+			instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
+			kryo.setInstantiatorStrategy(instantiatorStrategy);
+
+			kryo.setAsmEnabled(true);
+
+			KryoUtils.applyRegistrations(kryo, kryoRegistrations.values());
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return 31 * this.type.hashCode() + this.typeToInstantiate.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof AvroSerializer) {
+			@SuppressWarnings("unchecked")
+			AvroSerializer<T> avroSerializer = (AvroSerializer<T>) obj;
+
+			return avroSerializer.canEqual(this) &&
+					type == avroSerializer.type &&
+					typeToInstantiate == avroSerializer.typeToInstantiate;
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof AvroSerializer;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Serializer configuration snapshotting & compatibility
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public AvroSerializerConfigSnapshot<T> snapshotConfiguration() {
+		return new AvroSerializerConfigSnapshot<>(type, typeToInstantiate, kryoRegistrations);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
+			final AvroSerializerConfigSnapshot<T> config = (AvroSerializerConfigSnapshot<T>) configSnapshot;
+
+			if (type.equals(config.getTypeClass()) && typeToInstantiate.equals(config.getTypeToInstantiate())) {
+				// resolve Kryo registrations; currently, since the Kryo registrations in Avro
+				// are fixed, there shouldn't be a problem with the resolution here.
+
+				LinkedHashMap<String, KryoRegistration> oldRegistrations = config.getKryoRegistrations();
+				oldRegistrations.putAll(kryoRegistrations);
+
+				for (Map.Entry<String, KryoRegistration> reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) {
+					if (reconfiguredRegistrationEntry.getValue().isDummy()) {
+						return CompatibilityResult.requiresMigration();
+					}
+				}
+
+				this.kryoRegistrations = oldRegistrations;
+				return CompatibilityResult.compatible();
+			}
+		}
+
+		// ends up here if the preceding serializer is not
+		// the ValueSerializer, or serialized data type has changed
+		return CompatibilityResult.requiresMigration();
+	}
+
+	/**
+	 * Config snapshot for this serializer.
+	 */
+	public static class AvroSerializerConfigSnapshot<T> extends KryoRegistrationSerializerConfigSnapshot<T> {
+
+		private static final int VERSION = 1;
+
+		private Class<? extends T> typeToInstantiate;
+
+		public AvroSerializerConfigSnapshot() {}
+
+		public AvroSerializerConfigSnapshot(
+				Class<T> baseType,
+				Class<? extends T> typeToInstantiate,
+				LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
+
+			super(baseType, kryoRegistrations);
+			this.typeToInstantiate = Preconditions.checkNotNull(typeToInstantiate);
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			super.write(out);
+
+			out.writeUTF(typeToInstantiate.getName());
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public void read(DataInputView in) throws IOException {
+			super.read(in);
+
+			String classname = in.readUTF();
+			try {
+				typeToInstantiate = (Class<? extends T>) Class.forName(classname, true, getUserCodeClassLoader());
+			} catch (ClassNotFoundException e) {
+				throw new IOException("Cannot find requested class " + classname + " in classpath.", e);
+			}
+		}
+
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+
+		public Class<? extends T> getTypeToInstantiate() {
+			return typeToInstantiate;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+
+		// kryoRegistrations may be null if this Avro serializer is deserialized from an old version
+		if (kryoRegistrations == null) {
+			this.kryoRegistrations = buildKryoRegistrations(type);
+		}
+	}
+
+	private static <T> LinkedHashMap<String, KryoRegistration> buildKryoRegistrations(Class<T> serializedDataType) {
+		final LinkedHashMap<String, KryoRegistration> registrations = new LinkedHashMap<>();
+
+		// register Avro types.
+		registrations.put(
+				GenericData.Array.class.getName(),
+				new KryoRegistration(
+						GenericData.Array.class,
+						new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList())));
+		registrations.put(Utf8.class.getName(), new KryoRegistration(Utf8.class));
+		registrations.put(GenericData.EnumSymbol.class.getName(), new KryoRegistration(GenericData.EnumSymbol.class));
+		registrations.put(GenericData.Fixed.class.getName(), new KryoRegistration(GenericData.Fixed.class));
+		registrations.put(GenericData.StringType.class.getName(), new KryoRegistration(GenericData.StringType.class));
+
+		// register the serialized data type
+		registrations.put(serializedDataType.getName(), new KryoRegistration(serializedDataType));
+
+		return registrations;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
index 02f74f5..bc3369f 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
@@ -18,85 +18,93 @@
 
 package org.apache.flink.formats.avro.typeutils;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
 import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.KryoUtils;
-import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.formats.avro.utils.DataInputDecoder;
 import org.apache.flink.formats.avro.utils.DataOutputEncoder;
 import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
 
-import com.esotericsoftware.kryo.Kryo;
-import org.apache.avro.generic.GenericData;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaCompatibility;
+import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType;
+import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility;
+import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.util.Utf8;
-import org.objenesis.strategy.StdInstantiatorStrategy;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.LinkedHashMap;
-import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * General purpose serialization. Currently using Apache Avro's Reflect-serializers for serialization and
- * Kryo for deep object copies. We want to change this to Kryo-only.
+ * A serializer that serializes types via Avro.
  *
- * @param <T> The type serialized.
+ * <p>The serializer supports both efficient specific record serialization for
+ * types generated via Avro, as well as serialization via reflection
+ * (ReflectDatumReader / -Writer). The serializer instantiates them depending on
+ * the class of the type it should serialize.
+ *
+ * @param <T> The type to be serialized.
  */
-@Internal
-public final class AvroSerializer<T> extends TypeSerializer<T> {
+public class AvroSerializer<T> extends TypeSerializer<T> {
 
 	private static final long serialVersionUID = 1L;
 
-	private final Class<T> type;
+	// -------- configuration fields, serializable -----------
 
-	private final Class<? extends T> typeToInstantiate;
+	/** The class of the type that is serialized by this serializer. */
+	private final Class<T> type;
 
-	/**
-	 * Map of class tag (using classname as tag) to their Kryo registration.
-	 *
-	 * <p>This map serves as a preview of the final registration result of
-	 * the Kryo instance, taking into account registration overwrites.
-	 */
-	private LinkedHashMap<String, KryoRegistration> kryoRegistrations;
+	// -------- runtime fields, non-serializable, lazily initialized -----------
 
-	private transient ReflectDatumWriter<T> writer;
-	private transient ReflectDatumReader<T> reader;
+	private transient SpecificDatumWriter<T> writer;
+	private transient SpecificDatumReader<T> reader;
 
 	private transient DataOutputEncoder encoder;
 	private transient DataInputDecoder decoder;
 
-	private transient Kryo kryo;
+	private transient SpecificData avroData;
 
-	private transient T deepCopyInstance;
+	private transient Schema schema;
 
-	// --------------------------------------------------------------------------------------------
+	/** The serializer configuration snapshot, cached for efficiency. */
+	private transient AvroSchemaSerializerConfigSnapshot configSnapshot;
 
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new AvroSerializer for the type indicated by the given class.
+	 */
 	public AvroSerializer(Class<T> type) {
-		this(type, type);
+		this.type = checkNotNull(type);
 	}
 
+	/**
+	 * @deprecated Use {@link AvroSerializer#AvroSerializer(Class)} instead.
+	 */
+	@Deprecated
+	@SuppressWarnings("unused")
 	public AvroSerializer(Class<T> type, Class<? extends T> typeToInstantiate) {
-		this.type = checkNotNull(type);
-		this.typeToInstantiate = checkNotNull(typeToInstantiate);
+		this(type);
+	}
 
-		InstantiationUtil.checkForInstantiation(typeToInstantiate);
+	// ------------------------------------------------------------------------
 
-		this.kryoRegistrations = buildKryoRegistrations(type);
+	public Class<T> getType() {
+		return type;
 	}
 
-	// --------------------------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
+	//  Properties
+	// ------------------------------------------------------------------------
 
 	@Override
 	public boolean isImmutableType() {
@@ -104,32 +112,17 @@ public final class AvroSerializer<T> extends TypeSerializer<T> {
 	}
 
 	@Override
-	public AvroSerializer<T> duplicate() {
-		return new AvroSerializer<T>(type, typeToInstantiate);
-	}
-
-	@Override
-	public T createInstance() {
-		return InstantiationUtil.instantiate(this.typeToInstantiate);
-	}
-
-	@Override
-	public T copy(T from) {
-		checkKryoInitialized();
-
-		return KryoUtils.copy(from, kryo, this);
+	public int getLength() {
+		return -1;
 	}
 
-	@Override
-	public T copy(T from, T reuse) {
-		checkKryoInitialized();
-
-		return KryoUtils.copy(from, reuse, kryo, this);
-	}
+	// ------------------------------------------------------------------------
+	//  Serialization
+	// ------------------------------------------------------------------------
 
 	@Override
-	public int getLength() {
-		return -1;
+	public T createInstance() {
+		return InstantiationUtil.instantiate(type);
 	}
 
 	@Override
@@ -153,111 +146,216 @@ public final class AvroSerializer<T> extends TypeSerializer<T> {
 		return this.reader.read(reuse, this.decoder);
 	}
 
+	// ------------------------------------------------------------------------
+	//  Copying
+	// ------------------------------------------------------------------------
+
 	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
+	public T copy(T from) {
 		checkAvroInitialized();
+		return avroData.deepCopy(schema, from);
+	}
 
-		if (this.deepCopyInstance == null) {
-			this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class);
-		}
-
-		this.decoder.setIn(source);
-		this.encoder.setOut(target);
+	@Override
+	public T copy(T from, T reuse) {
+		return copy(from);
+	}
 
-		T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
-		this.writer.write(tmp, this.encoder);
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		T value = deserialize(source);
+		serialize(value, target);
 	}
 
-	private void checkAvroInitialized() {
-		if (this.reader == null) {
-			this.reader = new ReflectDatumReader<T>(type);
-			this.writer = new ReflectDatumWriter<T>(type);
-			this.encoder = new DataOutputEncoder();
-			this.decoder = new DataInputDecoder();
+	// ------------------------------------------------------------------------
+	//  Compatibility and Upgrades
+	// ------------------------------------------------------------------------
+
+	@Override
+	public TypeSerializerConfigSnapshot snapshotConfiguration() {
+		if (configSnapshot == null) {
+			checkAvroInitialized();
+			configSnapshot = new AvroSchemaSerializerConfigSnapshot(schema.toString(false));
 		}
+		return configSnapshot;
 	}
 
-	private void checkKryoInitialized() {
-		if (this.kryo == null) {
-			this.kryo = new Kryo();
-
-			Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
-			instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
-			kryo.setInstantiatorStrategy(instantiatorStrategy);
+	@Override
+	@SuppressWarnings("deprecation")
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		if (configSnapshot instanceof AvroSchemaSerializerConfigSnapshot) {
+			// proper schema snapshot, can do the sophisticated schema-based compatibility check
+			final String schemaString = ((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString();
+			final Schema lastSchema = new Schema.Parser().parse(schemaString);
 
-			kryo.setAsmEnabled(true);
+			checkAvroInitialized();
+			final SchemaPairCompatibility compatibility =
+					SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema);
 
-			KryoUtils.applyRegistrations(kryo, kryoRegistrations.values());
+			return compatibility.getType() == SchemaCompatibilityType.COMPATIBLE ?
+					CompatibilityResult.compatible() : CompatibilityResult.requiresMigration();
+		}
+		else if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
+			// old snapshot case, just compare the type
+			// we don't need to restore any Kryo stuff, since Kryo was never used for persistence,
+			// only for object-to-object copies.
+			final AvroSerializerConfigSnapshot old = (AvroSerializerConfigSnapshot) configSnapshot;
+			return type.equals(old.getTypeClass()) ?
+					CompatibilityResult.compatible() : CompatibilityResult.requiresMigration();
+		}
+		else {
+			return CompatibilityResult.requiresMigration();
 		}
 	}
 
-	// --------------------------------------------------------------------------------------------
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	@Override
+	public TypeSerializer<T> duplicate() {
+		return new AvroSerializer<>(type);
+	}
 
 	@Override
 	public int hashCode() {
-		return 31 * this.type.hashCode() + this.typeToInstantiate.hashCode();
+		return 42 + type.hashCode();
 	}
 
 	@Override
 	public boolean equals(Object obj) {
-		if (obj instanceof AvroSerializer) {
-			@SuppressWarnings("unchecked")
-			AvroSerializer<T> avroSerializer = (AvroSerializer<T>) obj;
-
-			return avroSerializer.canEqual(this) &&
-				type == avroSerializer.type &&
-				typeToInstantiate == avroSerializer.typeToInstantiate;
-		} else {
+		if (obj == this) {
+			return true;
+		}
+		else if (obj != null && obj.getClass() == AvroSerializer.class) {
+			final AvroSerializer that = (AvroSerializer) obj;
+			return this.type == that.type;
+		}
+		else {
 			return false;
 		}
 	}
 
 	@Override
 	public boolean canEqual(Object obj) {
-		return obj instanceof AvroSerializer;
+		return obj.getClass() == this.getClass();
 	}
 
-	// --------------------------------------------------------------------------------------------
-	// Serializer configuration snapshotting & compatibility
-	// --------------------------------------------------------------------------------------------
-
 	@Override
-	public AvroSerializerConfigSnapshot<T> snapshotConfiguration() {
-		return new AvroSerializerConfigSnapshot<>(type, typeToInstantiate, kryoRegistrations);
+	public String toString() {
+		return getClass().getName() + " (" + getType().getName() + ')';
 	}
 
-	@SuppressWarnings("unchecked")
-	@Override
-	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-		if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
-			final AvroSerializerConfigSnapshot<T> config = (AvroSerializerConfigSnapshot<T>) configSnapshot;
+	// ------------------------------------------------------------------------
+	//  Initialization
+	// ------------------------------------------------------------------------
+
+	private void checkAvroInitialized() {
+		if (writer == null) {
+			initializeAvro();
+		}
+	}
+
+	private void initializeAvro() {
+		final ClassLoader cl = Thread.currentThread().getContextClassLoader();
+
+		if (SpecificRecord.class.isAssignableFrom(type)) {
+			this.avroData = new SpecificData(cl);
+			this.schema = this.avroData.getSchema(type);
+			this.reader = new SpecificDatumReader<>(schema, schema, avroData);
+			this.writer = new SpecificDatumWriter<>(schema, avroData);
+		}
+		else {
+			final ReflectData reflectData = new ReflectData(cl);
+			this.avroData = reflectData;
+			this.schema = this.avroData.getSchema(type);
+			this.reader = new ReflectDatumReader<>(schema, schema, reflectData);
+			this.writer = new ReflectDatumWriter<>(schema, reflectData);
+		}
+
+		this.encoder = new DataOutputEncoder();
+		this.decoder = new DataInputDecoder();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Serializer Snapshots
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A config snapshot for the Avro Serializer that stores the Avro Schema to check compatibility.
+	 */
+	public static final class AvroSchemaSerializerConfigSnapshot extends TypeSerializerConfigSnapshot {
+
+		private String schemaString;
+
+		/**
+		 * Default constructor for instantiation via reflection.
+		 */
+		@SuppressWarnings("unused")
+		public AvroSchemaSerializerConfigSnapshot() {}
+
+		public AvroSchemaSerializerConfigSnapshot(String schemaString) {
+			this.schemaString = checkNotNull(schemaString);
+		}
+
+		public String getSchemaString() {
+			return schemaString;
+		}
+
+		// --- Serialization ---
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+			super.read(in);
+			this.schemaString = in.readUTF();
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			super.write(out);
+			out.writeUTF(schemaString);
+		}
 
-			if (type.equals(config.getTypeClass()) && typeToInstantiate.equals(config.getTypeToInstantiate())) {
-				// resolve Kryo registrations; currently, since the Kryo registrations in Avro
-				// are fixed, there shouldn't be a problem with the resolution here.
+		// --- Version ---
 
-				LinkedHashMap<String, KryoRegistration> oldRegistrations = config.getKryoRegistrations();
-				oldRegistrations.putAll(kryoRegistrations);
+		@Override
+		public int getVersion() {
+			return 1;
+		}
 
-				for (Map.Entry<String, KryoRegistration> reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) {
-					if (reconfiguredRegistrationEntry.getValue().isDummy()) {
-						return CompatibilityResult.requiresMigration();
-					}
-				}
+		// --- Utils ---
 
-				this.kryoRegistrations = oldRegistrations;
-				return CompatibilityResult.compatible();
+		@Override
+		public boolean equals(Object obj) {
+			if (obj == this) {
+				return true;
+			}
+			else if (obj != null && obj.getClass() == AvroSchemaSerializerConfigSnapshot.class) {
+				final AvroSchemaSerializerConfigSnapshot that = (AvroSchemaSerializerConfigSnapshot) obj;
+				return this.schemaString.equals(that.schemaString);
+			}
+			else {
+				return false;
 			}
 		}
 
-		// ends up here if the preceding serializer is not
-		// the ValueSerializer, or serialized data type has changed
-		return CompatibilityResult.requiresMigration();
+		@Override
+		public int hashCode() {
+			return 11 + schemaString.hashCode();
+		}
+
+		@Override
+		public String toString() {
+			return getClass().getName() + " (" + schemaString + ')';
+		}
 	}
 
 	/**
-	 * {@link TypeSerializerConfigSnapshot} for Avro.
+	 * The outdated config snapshot, retained for backwards compatibility.
+	 *
+	 * @deprecated The {@link AvroSchemaSerializerConfigSnapshot} should be used instead.
 	 */
+	@Deprecated
 	public static class AvroSerializerConfigSnapshot<T> extends KryoRegistrationSerializerConfigSnapshot<T> {
 
 		private static final int VERSION = 1;
@@ -266,15 +364,6 @@ public final class AvroSerializer<T> extends TypeSerializer<T> {
 
 		public AvroSerializerConfigSnapshot() {}
 
-		public AvroSerializerConfigSnapshot(
-			Class<T> baseType,
-			Class<? extends T> typeToInstantiate,
-			LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
-
-			super(baseType, kryoRegistrations);
-			this.typeToInstantiate = Preconditions.checkNotNull(typeToInstantiate);
-		}
-
 		@Override
 		public void write(DataOutputView out) throws IOException {
 			super.write(out);
@@ -304,35 +393,4 @@ public final class AvroSerializer<T> extends TypeSerializer<T> {
 			return typeToInstantiate;
 		}
 	}
-
-	// --------------------------------------------------------------------------------------------
-
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		in.defaultReadObject();
-
-		// kryoRegistrations may be null if this Avro serializer is deserialized from an old version
-		if (kryoRegistrations == null) {
-			this.kryoRegistrations = buildKryoRegistrations(type);
-		}
-	}
-
-	private static <T> LinkedHashMap<String, KryoRegistration> buildKryoRegistrations(Class<T> serializedDataType) {
-		final LinkedHashMap<String, KryoRegistration> registrations = new LinkedHashMap<>();
-
-		// register Avro types.
-		registrations.put(
-				GenericData.Array.class.getName(),
-				new KryoRegistration(
-						GenericData.Array.class,
-						new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList())));
-		registrations.put(Utf8.class.getName(), new KryoRegistration(Utf8.class));
-		registrations.put(GenericData.EnumSymbol.class.getName(), new KryoRegistration(GenericData.EnumSymbol.class));
-		registrations.put(GenericData.Fixed.class.getName(), new KryoRegistration(GenericData.Fixed.class));
-		registrations.put(GenericData.StringType.class.getName(), new KryoRegistration(GenericData.StringType.class));
-
-		// register the serialized data type
-		registrations.put(serializedDataType.getName(), new KryoRegistration(serializedDataType));
-
-		return registrations;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
index ad6b06e..644ee50 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
@@ -33,6 +33,7 @@ import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Special type information to generate a special AvroTypeInfo for Avro POJOs (implementing SpecificRecordBase, the typed Avro POJOs)
@@ -49,43 +50,83 @@ public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T>
 
 	private static final long serialVersionUID = 1L;
 
+	private static final ConcurrentHashMap<Thread, Boolean> IN_BACKWARDS_COMPATIBLE_MODE = new ConcurrentHashMap<>();
+
+	private final boolean useBackwardsCompatibleSerializer;
+
+	/**
+	 * Creates a new Avro type info for the given class.
+	 */
 	public AvroTypeInfo(Class<T> typeClass) {
-		super(typeClass, generateFieldsFromAvroSchema(typeClass));
+		this(typeClass, false);
+	}
+
+	/**
+	 * Creates a new Avro type info for the given class.
+	 *
+	 * <p>This constructor takes a flag to specify whether a serializer
+	 * that is backwards compatible with PoJo-style serialization of Avro types should be used.
+	 * That is only necessary, if one has a Flink 1.3 (or earlier) savepoint where Avro types
+	 * were stored in the checkpointed state. New Flink programs will never need this.
+	 */
+	public AvroTypeInfo(Class<T> typeClass, boolean useBackwardsCompatibleSerializer) {
+		super(typeClass, generateFieldsFromAvroSchema(typeClass, useBackwardsCompatibleSerializer));
+
+		final Boolean modeOnStack = IN_BACKWARDS_COMPATIBLE_MODE.get(Thread.currentThread());
+		this.useBackwardsCompatibleSerializer = modeOnStack == null ?
+				useBackwardsCompatibleSerializer : modeOnStack;
 	}
 
 	@Override
+	@SuppressWarnings("deprecation")
 	public TypeSerializer<T> createSerializer(ExecutionConfig config) {
-		return super.createSerializer(config);
+		return useBackwardsCompatibleSerializer ?
+				new BackwardsCompatibleAvroSerializer<>(getTypeClass()) :
+				new AvroSerializer<>(getTypeClass());
 	}
 
 	@SuppressWarnings("unchecked")
 	@Internal
-	public static <T extends SpecificRecordBase> List<PojoField> generateFieldsFromAvroSchema(Class<T> typeClass) {
-		PojoTypeExtractor pte = new PojoTypeExtractor();
-		ArrayList<Type> typeHierarchy = new ArrayList<>();
-		typeHierarchy.add(typeClass);
-		TypeInformation ti = pte.analyzePojo(typeClass, typeHierarchy, null, null, null);
-
-		if (!(ti instanceof PojoTypeInfo)) {
-			throw new IllegalStateException("Expecting type to be a PojoTypeInfo");
-		}
-		PojoTypeInfo pti =  (PojoTypeInfo) ti;
-		List<PojoField> newFields = new ArrayList<>(pti.getTotalFields());
-
-		for (int i = 0; i < pti.getArity(); i++) {
-			PojoField f = pti.getPojoFieldAt(i);
-			TypeInformation newType = f.getTypeInformation();
-			// check if type is a CharSequence
-			if (newType instanceof GenericTypeInfo) {
-				if ((newType).getTypeClass().equals(CharSequence.class)) {
-					// replace the type by a org.apache.avro.util.Utf8
-					newType = new GenericTypeInfo(org.apache.avro.util.Utf8.class);
+	public static <T extends SpecificRecordBase> List<PojoField> generateFieldsFromAvroSchema(
+			Class<T> typeClass,
+			boolean useBackwardsCompatibleSerializer) {
+
+		final Thread currentThread = Thread.currentThread();
+		final boolean entryPoint =
+				IN_BACKWARDS_COMPATIBLE_MODE.putIfAbsent(currentThread, useBackwardsCompatibleSerializer) == null;
+
+		try {
+			PojoTypeExtractor pte = new PojoTypeExtractor();
+			ArrayList<Type> typeHierarchy = new ArrayList<>();
+			typeHierarchy.add(typeClass);
+			TypeInformation ti = pte.analyzePojo(typeClass, typeHierarchy, null, null, null);
+
+			if (!(ti instanceof PojoTypeInfo)) {
+				throw new IllegalStateException("Expecting type to be a PojoTypeInfo");
+			}
+			PojoTypeInfo pti =  (PojoTypeInfo) ti;
+			List<PojoField> newFields = new ArrayList<>(pti.getTotalFields());
+
+			for (int i = 0; i < pti.getArity(); i++) {
+				PojoField f = pti.getPojoFieldAt(i);
+				TypeInformation newType = f.getTypeInformation();
+				// check if type is a CharSequence
+				if (newType instanceof GenericTypeInfo) {
+					if ((newType).getTypeClass().equals(CharSequence.class)) {
+						// replace the type by a org.apache.avro.util.Utf8
+						newType = new GenericTypeInfo(org.apache.avro.util.Utf8.class);
+					}
 				}
+				PojoField newField = new PojoField(f.getField(), newType);
+				newFields.add(newField);
+			}
+			return newFields;
+		}
+		finally {
+			if (entryPoint) {
+				IN_BACKWARDS_COMPATIBLE_MODE.remove(currentThread);
 			}
-			PojoField newField = new PojoField(f.getField(), newType);
-			newFields.add(newField);
 		}
-		return newFields;
 	}
 
 	private static class PojoTypeExtractor extends TypeExtractor {

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
new file mode 100644
index 0000000..e5eb5d8
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSchemaSerializerConfigSnapshot;
+import org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSerializerConfigSnapshot;
+
+import org.apache.avro.specific.SpecificRecordBase;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An Avro serializer that can switch back to a KryoSerializer or a Pojo Serializer, if
+ * it has to ensure compatibility with one of those.
+ *
+ * <p>This serializer is there only as a means to explicitly fall back to PoJo serialization
+ * in the case where an upgrade from an earlier savepoint was made.
+ *
+ * @param <T> The type to be serialized.
+ */
+@SuppressWarnings("deprecation")
+public class BackwardsCompatibleAvroSerializer<T> extends TypeSerializer<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	/** The type to serialize. */
+	private final Class<T> type;
+
+	/** The type serializer currently used. Avro by default. */
+	private TypeSerializer<T> serializer;
+
+	/**
+	 * Creates a new backwards-compatible Avro Serializer, for the given type.
+	 */
+	public BackwardsCompatibleAvroSerializer(Class<T> type) {
+		this.type = type;
+		this.serializer = new AvroSerializer<>(type);
+	}
+
+	/**
+	 * Private copy constructor.
+	 */
+	private BackwardsCompatibleAvroSerializer(Class<T> type, TypeSerializer<T> serializer) {
+		this.type = type;
+		this.serializer = serializer;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Properties
+	// ------------------------------------------------------------------------
+
+	@Override
+	public boolean isImmutableType() {
+		return serializer.isImmutableType();
+	}
+
+	@Override
+	public int getLength() {
+		return serializer.getLength();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Serialization
+	// ------------------------------------------------------------------------
+
+	@Override
+	public T createInstance() {
+		return serializer.createInstance();
+	}
+
+	@Override
+	public void serialize(T value, DataOutputView target) throws IOException {
+		serializer.serialize(value, target);
+	}
+
+	@Override
+	public T deserialize(DataInputView source) throws IOException {
+		return serializer.deserialize(source);
+	}
+
+	@Override
+	public T deserialize(T reuse, DataInputView source) throws IOException {
+		return serializer.deserialize(reuse, source);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Copying
+	// ------------------------------------------------------------------------
+
+	@Override
+	public T copy(T from) {
+		return serializer.copy(from);
+	}
+
+	@Override
+	public T copy(T from, T reuse) {
+		return serializer.copy(from, reuse);
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		serializer.copy(source, target);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	@Override
+	public TypeSerializer<T> duplicate() {
+		return new BackwardsCompatibleAvroSerializer<>(type, serializer.duplicate());
+	}
+
+	@Override
+	public int hashCode() {
+		return type.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		}
+		else if (obj != null && obj.getClass() == BackwardsCompatibleAvroSerializer.class) {
+			final BackwardsCompatibleAvroSerializer that = (BackwardsCompatibleAvroSerializer) obj;
+			return this.type == that.type && this.serializer.equals(that.serializer);
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj.getClass() == this.getClass();
+	}
+
+	@Override
+	public String toString() {
+		return getClass().getName() + " (" + type.getName() + ')';
+	}
+
+	// ------------------------------------------------------------------------
+	//  Configuration Snapshots and Upgrades
+	// ------------------------------------------------------------------------
+
+	@Override
+	public TypeSerializerConfigSnapshot snapshotConfiguration() {
+		// we return the configuration of the actually used serializer here
+		return serializer.snapshotConfiguration();
+	}
+
+	@Override
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		if (configSnapshot instanceof AvroSchemaSerializerConfigSnapshot ||
+				configSnapshot instanceof AvroSerializerConfigSnapshot) {
+
+			// avro serializer, nice :-)
+			checkState(serializer instanceof AvroSerializer,
+					"Serializer was changed backwards to PojoSerializer and now encounters AvroSerializer snapshot.");
+
+			return serializer.ensureCompatibility(configSnapshot);
+		}
+		else if (configSnapshot instanceof PojoSerializerConfigSnapshot) {
+			// common previous case
+			checkState(SpecificRecordBase.class.isAssignableFrom(type),
+					"BackwardsCompatibleAvroSerializer resuming a state serialized " +
+							"via a PojoSerializer, but not for an Avro Specific Record");
+
+			final AvroTypeInfo<? extends SpecificRecordBase> typeInfo =
+					new AvroTypeInfo<>(type.asSubclass(SpecificRecordBase.class), true);
+
+			@SuppressWarnings("unchecked")
+			final TypeSerializer<T> pojoSerializer =
+					(TypeSerializer<T>) typeInfo.createPojoSerializer(new ExecutionConfig());
+			this.serializer = pojoSerializer;
+			return serializer.ensureCompatibility(configSnapshot);
+		}
+		else if (configSnapshot instanceof KryoRegistrationSerializerConfigSnapshot) {
+			// force-kryo old case common previous case
+			// we create a new Kryo Serializer with a blank execution config.
+			// registrations are anyways picked up from the snapshot.
+			serializer = new KryoSerializer<>(type, new ExecutionConfig());
+			return serializer.ensureCompatibility(configSnapshot);
+		}
+		else {
+			// completely incompatible type, needs migration
+			return CompatibilityResult.requiresMigration();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java
new file mode 100644
index 0000000..0ab5868
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.formats.avro.utils.TestDataGenerator;
+
+import java.util.Random;
+
+/**
+ * Tests for the {@link AvroSerializer} that test specific avro types.
+ */
+public class AvroSerializerTest extends SerializerTestBase<User> {
+
+	@Override
+	protected TypeSerializer<User> createSerializer() {
+		return new AvroSerializer<>(User.class);
+	}
+
+	@Override
+	protected int getLength() {
+		return -1;
+	}
+
+	@Override
+	protected Class<User> getTypeClass() {
+		return User.class;
+	}
+
+	@Override
+	protected User[] getTestData() {
+		final Random rnd = new Random();
+		final User[] users = new User[20];
+
+		for (int i = 0; i < users.length; i++) {
+			users[i] = TestDataGenerator.generateRandomUser(rnd);
+		}
+
+		return users;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
index ae41031..fbabb95 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
@@ -82,21 +82,14 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
 
 		AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
 		DataSet<User> usersDS = env.createInput(users)
-				// null map type because the order changes in different JVMs (hard to test)
-		.map(new MapFunction<User, User>() {
-			@Override
-			public User map(User value) throws Exception {
-				value.setTypeMap(null);
-				return value;
-			}
-		});
+				.map((value) -> value);
 
 		usersDS.writeAsText(resultPath);
 
 		env.execute("Simple Avro read job");
 
-		expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" +
-					"{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n";
+		expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": {\"KEY 2\": 17554, \"KEY 1\": 8546456}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" +
+					"{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n";
 	}
 
 	@Test
@@ -107,7 +100,6 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
 
 		AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
 		DataSet<User> usersDS = env.createInput(users)
-				// null map type because the order changes in different JVMs (hard to test)
 				.map(new MapFunction<User, User>() {
 					@Override
 					public User map(User value) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java
index 79a4a45..371cd4f 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfoTest.java
@@ -18,10 +18,16 @@
 
 package org.apache.flink.formats.avro.typeutils;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.formats.avro.generated.Address;
 import org.apache.flink.formats.avro.generated.User;
 
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
 /**
  * Test for {@link AvroTypeInfo}.
  */
@@ -34,4 +40,10 @@ public class AvroTypeInfoTest extends TypeInformationTestBase<AvroTypeInfo<?>> {
 			new AvroTypeInfo<>(User.class),
 		};
 	}
+
+	@Test
+	public void testAvroByDefault() {
+		final TypeSerializer<User> serializer = new AvroTypeInfo<>(User.class).createSerializer(new ExecutionConfig());
+		assertTrue(serializer instanceof AvroSerializer);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
new file mode 100644
index 0000000..92395ba
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
+import org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.formats.avro.utils.TestDataGenerator;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This test ensures that state and state configuration created by Flink 1.3 Avro types
+ * that used the PojoSerializer still works.
+ *
+ * <p><b>Important:</b> Since Avro itself broke class compatibility between 1.7.7 (used in Flink 1.3)
+ * and 1.8.2 (used in Flink 1.4), the Avro by Pojo compatibility is broken through Avro already.
+ * This test only tests that the Avro serializer change (switching from Pojo to Avro for Avro types)
+ * works properly.
+ *
+ * <p>This test can be dropped once we drop backwards compatibility with Flink 1.3 snapshots.
+ */
+public class BackwardsCompatibleAvroSerializerTest {
+
+	private static final String SNAPSHOT_RESOURCE = "flink-1.3-avro-type-serializer-snapshot";
+
+	private static final String DATA_RESOURCE = "flink-1.3-avro-type-serialized-data";
+
+	@SuppressWarnings("unused")
+	private static final String SNAPSHOT_RESOURCE_WRITER = "/data/repositories/flink/flink-formats/flink-avro/src/test/resources/" + SNAPSHOT_RESOURCE;
+
+	@SuppressWarnings("unused")
+	private static final String DATA_RESOURCE_WRITER = "/data/repositories/flink/flink-formats/flink-avro/src/test/resources/" + DATA_RESOURCE;
+
+	private static final long RANDOM_SEED = 143065108437678L;
+
+	private static final int NUM_DATA_ENTRIES = 20;
+
+	@Test
+	public void testCompatibilityWithFlink_1_3() throws Exception {
+
+		// retrieve the old config snapshot
+
+		final TypeSerializer<User> serializer;
+		final TypeSerializerConfigSnapshot configSnapshot;
+
+		try (InputStream in = getClass().getClassLoader().getResourceAsStream(SNAPSHOT_RESOURCE)) {
+			DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(in);
+
+			List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> deserialized =
+					TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(
+							inView, getClass().getClassLoader());
+
+			assertEquals(1, deserialized.size());
+
+			@SuppressWarnings("unchecked")
+			final TypeSerializer<User> typedSerializer = (TypeSerializer<User>) deserialized.get(0).f0;
+
+			serializer = typedSerializer;
+			configSnapshot = deserialized.get(0).f1;
+		}
+
+		assertNotNull(serializer);
+		assertNotNull(configSnapshot);
+
+		assertTrue(serializer instanceof PojoSerializer);
+		assertTrue(configSnapshot instanceof PojoSerializerConfigSnapshot);
+
+		// sanity check for the test: check that the test data works with the original serializer
+		validateDeserialization(serializer);
+
+		// sanity check for the test: check that a PoJoSerializer and the original serializer work together
+		assertFalse(serializer.ensureCompatibility(configSnapshot).isRequiresMigration());
+
+		final TypeSerializer<User> newSerializer = new AvroTypeInfo<>(User.class, true).createSerializer(new ExecutionConfig());
+		assertFalse(newSerializer.ensureCompatibility(configSnapshot).isRequiresMigration());
+
+		// deserialize the data and make sure this still works
+		validateDeserialization(newSerializer);
+
+		TypeSerializerConfigSnapshot nextSnapshot = newSerializer.snapshotConfiguration();
+		final TypeSerializer<User> nextSerializer = new AvroTypeInfo<>(User.class, true).createSerializer(new ExecutionConfig());
+
+		assertFalse(nextSerializer.ensureCompatibility(nextSnapshot).isRequiresMigration());
+
+		// deserialize the data and make sure this still works
+		validateDeserialization(nextSerializer);
+	}
+
+	private static void validateDeserialization(TypeSerializer<User> serializer) throws IOException {
+		final Random rnd = new Random(RANDOM_SEED);
+
+		try (InputStream in = BackwardsCompatibleAvroSerializerTest.class.getClassLoader()
+				.getResourceAsStream(DATA_RESOURCE)) {
+
+			final DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(in);
+
+			for (int i = 0; i < NUM_DATA_ENTRIES; i++) {
+				final User deserialized = serializer.deserialize(inView);
+
+				// deterministically generate a reference record
+				final User reference = TestDataGenerator.generateRandomUser(rnd);
+
+				assertEquals(reference, deserialized);
+			}
+		}
+	}
+
+// run this code on a 1.3 (or earlier) branch to generate the test data
+//	public static void main(String[] args) throws Exception {
+//
+//		AvroTypeInfo<User> typeInfo = new AvroTypeInfo<>(User.class);
+//
+//		TypeSerializer<User> serializer = typeInfo.createPojoSerializer(new ExecutionConfig());
+//		TypeSerializerConfigSnapshot confSnapshot = serializer.snapshotConfiguration();
+//
+//		try (FileOutputStream fos = new FileOutputStream(SNAPSHOT_RESOURCE_WRITER)) {
+//			DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(fos);
+//
+//			TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
+//					out,
+//					Collections.singletonList(
+//							new Tuple2<>(serializer, confSnapshot)));
+//		}
+//
+//		try (FileOutputStream fos = new FileOutputStream(DATA_RESOURCE_WRITER)) {
+//			final DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(fos);
+//			final Random rnd = new Random(RANDOM_SEED);
+//
+//			for (int i = 0; i < NUM_DATA_ENTRIES; i++) {
+//				serializer.serialize(TestDataGenerator.generateRandomUser(rnd), out);
+//			}
+//		}
+//	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
new file mode 100644
index 0000000..9a9061e
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.utils;
+
+import org.apache.flink.formats.avro.generated.Address;
+import org.apache.flink.formats.avro.generated.Colors;
+import org.apache.flink.formats.avro.generated.Fixed16;
+import org.apache.flink.formats.avro.generated.User;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Generator for random test data for the generated Avro User type.
+ */
+public class TestDataGenerator {
+
+	public static User generateRandomUser(Random rnd) {
+		return new User(
+				generateRandomString(rnd, 50),
+				rnd.nextBoolean() ? null : rnd.nextInt(),
+				rnd.nextBoolean() ? null : generateRandomString(rnd, 6),
+				rnd.nextBoolean() ? null : rnd.nextLong(),
+				rnd.nextDouble(),
+				null,
+				rnd.nextBoolean(),
+				generateRandomStringList(rnd, 20, 30),
+				generateRandomBooleanList(rnd, 20),
+				rnd.nextBoolean() ? null : generateRandomStringList(rnd, 20, 20),
+				generateRandomColor(rnd),
+				new HashMap<>(),
+				generateRandomFixed16(rnd),
+				generateRandomUnion(rnd),
+				generateRandomAddress(rnd));
+	}
+
+	public static Colors generateRandomColor(Random rnd) {
+		return Colors.values()[rnd.nextInt(Colors.values().length)];
+	}
+
+	public static Fixed16 generateRandomFixed16(Random rnd) {
+		if (rnd.nextBoolean()) {
+			return new Fixed16();
+		}
+		else {
+			byte[] bytes = new byte[16];
+			rnd.nextBytes(bytes);
+			return new Fixed16(bytes);
+		}
+	}
+
+	public static Address generateRandomAddress(Random rnd) {
+		return new Address(
+				rnd.nextInt(),
+				generateRandomString(rnd, 20),
+				generateRandomString(rnd, 20),
+				generateRandomString(rnd, 20),
+				generateRandomString(rnd, 20));
+	}
+
+	private static List<Boolean> generateRandomBooleanList(Random rnd, int maxEntries) {
+		final int num = rnd.nextInt(maxEntries + 1);
+		ArrayList<Boolean> list = new ArrayList<>();
+		for (int i = 0; i < num; i++) {
+			list.add(rnd.nextBoolean());
+		}
+		return list;
+	}
+
+	private static List<CharSequence> generateRandomStringList(Random rnd, int maxEntries, int maxLen) {
+		final int num = rnd.nextInt(maxEntries + 1);
+		ArrayList<CharSequence> list = new ArrayList<>();
+		for (int i = 0; i < num; i++) {
+			list.add(generateRandomString(rnd, maxLen));
+		}
+		return list;
+	}
+
+	private static String generateRandomString(Random rnd, int maxLen) {
+		char[] chars = new char[rnd.nextInt(maxLen + 1)];
+		for (int i = 0; i < chars.length; i++) {
+			chars[i] = (char) rnd.nextInt(Character.MAX_VALUE);
+		}
+		return new String(chars);
+	}
+
+	private static Object generateRandomUnion(Random rnd) {
+		if (rnd.nextBoolean()) {
+			if (rnd.nextBoolean()) {
+				return null;
+			} else {
+				return rnd.nextBoolean();
+			}
+		} else {
+			if (rnd.nextBoolean()) {
+				return rnd.nextLong();
+			} else {
+				return rnd.nextDouble();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data
new file mode 100644
index 0000000..028c1e6
Binary files /dev/null and b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data differ

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a2197a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot
new file mode 100644
index 0000000..5bfdf728
Binary files /dev/null and b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot differ


[3/3] flink git commit: [FLINK-7997] [avro] Make Avro part of the user code space

Posted by se...@apache.org.
[FLINK-7997] [avro] Make Avro part of the user code space

By not setting Avro as 'provided', the build system will put it
into the user code fat jar, rather than assuming it will be part
of Flink's 'lib' folder.

That way Avro is loaded child-first through the user code class
loader, giving it independent separate copies per load that avoid
version conflicts and caching problems.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c85f15ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c85f15ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c85f15ea

Branch: refs/heads/master
Commit: c85f15ead50e9961e284eef50e5dc569560db022
Parents: 633907b
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Nov 6 14:01:13 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 6 18:56:48 2017 +0100

----------------------------------------------------------------------
 flink-connectors/flink-connector-filesystem/pom.xml | 1 -
 flink-connectors/flink-connector-kafka-0.10/pom.xml | 1 -
 flink-connectors/flink-connector-kafka-0.11/pom.xml | 1 -
 flink-connectors/flink-connector-kafka-0.8/pom.xml  | 1 -
 flink-connectors/flink-connector-kafka-0.9/pom.xml  | 1 -
 flink-connectors/flink-connector-kafka-base/pom.xml | 1 -
 6 files changed, 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c85f15ea/flink-connectors/flink-connector-filesystem/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/pom.xml b/flink-connectors/flink-connector-filesystem/pom.xml
index 7efe68e..d1904a2 100644
--- a/flink-connectors/flink-connector-filesystem/pom.xml
+++ b/flink-connectors/flink-connector-filesystem/pom.xml
@@ -61,7 +61,6 @@ under the License.
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-avro</artifactId>
 			<version>${project.version}</version>
-			<scope>provided</scope>
 			<!-- Projects depending on this project, won't depend on flink-avro. -->
 			<optional>true</optional>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/c85f15ea/flink-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml
index a01431a..b985418 100644
--- a/flink-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml
@@ -86,7 +86,6 @@ under the License.
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-avro</artifactId>
 			<version>${project.version}</version>
-			<scope>provided</scope>
 			<!-- Projects depending on this project, won't depend on flink-avro. -->
 			<optional>true</optional>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/c85f15ea/flink-connectors/flink-connector-kafka-0.11/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/pom.xml b/flink-connectors/flink-connector-kafka-0.11/pom.xml
index f66a31d..c39c146 100644
--- a/flink-connectors/flink-connector-kafka-0.11/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.11/pom.xml
@@ -86,7 +86,6 @@ under the License.
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-avro</artifactId>
 			<version>${project.version}</version>
-			<scope>provided</scope>
 			<!-- Projects depending on this project, won't depend on flink-avro. -->
 			<optional>true</optional>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/c85f15ea/flink-connectors/flink-connector-kafka-0.8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/pom.xml b/flink-connectors/flink-connector-kafka-0.8/pom.xml
index 2b2fc34..3a398ac 100644
--- a/flink-connectors/flink-connector-kafka-0.8/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.8/pom.xml
@@ -77,7 +77,6 @@ under the License.
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-avro</artifactId>
 			<version>${project.version}</version>
-			<scope>provided</scope>
 			<!-- Projects depending on this project, won't depend on flink-avro. -->
 			<optional>true</optional>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/c85f15ea/flink-connectors/flink-connector-kafka-0.9/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/pom.xml b/flink-connectors/flink-connector-kafka-0.9/pom.xml
index ed322d3..94f4078 100644
--- a/flink-connectors/flink-connector-kafka-0.9/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.9/pom.xml
@@ -76,7 +76,6 @@ under the License.
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-avro</artifactId>
 			<version>${project.version}</version>
-			<scope>provided</scope>
 			<!-- Projects depending on this project, won't depend on flink-avro. -->
 			<optional>true</optional>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/c85f15ea/flink-connectors/flink-connector-kafka-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/pom.xml b/flink-connectors/flink-connector-kafka-base/pom.xml
index 9743a01..aae5716 100644
--- a/flink-connectors/flink-connector-kafka-base/pom.xml
+++ b/flink-connectors/flink-connector-kafka-base/pom.xml
@@ -70,7 +70,6 @@ under the License.
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-avro</artifactId>
 			<version>${project.version}</version>
-			<scope>provided</scope>
 			<!-- Projects depending on this project, won't depend on flink-avro. -->
 			<optional>true</optional>
 		</dependency>


[2/3] flink git commit: [hotfix] [avro] Define Avro version through variable

Posted by se...@apache.org.
[hotfix] [avro] Define Avro version through variable

Avro version is used multiple times (dependendies and plugins),
having a variable makes sure we use those in sync.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/633907b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/633907b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/633907b3

Branch: refs/heads/master
Commit: 633907b3b3f21261a6b82a4c72d0112a4b29432c
Parents: f3a2197
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Nov 6 13:52:34 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 6 18:56:48 2017 +0100

----------------------------------------------------------------------
 flink-formats/flink-avro/pom.xml | 2 +-
 pom.xml                          | 3 ++-
 2 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/633907b3/flink-formats/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/pom.xml b/flink-formats/flink-avro/pom.xml
index 8732a21..0313b05 100644
--- a/flink-formats/flink-avro/pom.xml
+++ b/flink-formats/flink-avro/pom.xml
@@ -144,7 +144,7 @@ under the License.
 			<plugin>
 				<groupId>org.apache.avro</groupId>
 				<artifactId>avro-maven-plugin</artifactId>
-				<version>1.8.2</version>
+				<version>${avro.version}</version>
 				<executions>
 					<execution>
 						<phase>generate-sources</phase>

http://git-wip-us.apache.org/repos/asf/flink/blob/633907b3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 62ece5b..d537be9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -118,6 +118,7 @@ under the License.
 		<jackson.version>2.7.9</jackson.version>
 		<metrics.version>3.1.5</metrics.version>
 		<prometheus.version>0.0.26</prometheus.version>
+		<avro.version>1.8.2</avro.version>
 		<junit.version>4.12</junit.version>
 		<mockito.version>1.10.19</mockito.version>
 		<powermock.version>1.6.5</powermock.version>
@@ -286,7 +287,7 @@ under the License.
  			<dependency>
 				<groupId>org.apache.avro</groupId>
 				<artifactId>avro</artifactId>
-				<version>1.8.2</version>
+				<version>${avro.version}</version>
 			</dependency>
 
 			<!-- Make sure we use a consistent commons-cli version throughout the project -->