You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/11/03 17:11:32 UTC

[09/21] flink git commit: [FLINK-7420] [avro] Move all Avro code to flink-avro

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
deleted file mode 100644
index 6f03b12..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.Encoder;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.specific.SpecificData;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.util.Utf8;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.List;
-
-/**
- * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes.
- */
-public class AvroRowSerializationSchema implements SerializationSchema<Row> {
-
-	/**
-	 * Avro record class.
-	 */
-	private Class<? extends SpecificRecord> recordClazz;
-
-	/**
-	 * Avro serialization schema.
-	 */
-	private transient Schema schema;
-
-	/**
-	 * Writer to serialize Avro record into a byte array.
-	 */
-	private transient DatumWriter<GenericRecord> datumWriter;
-
-	/**
-	 * Output stream to serialize records into byte array.
-	 */
-	private transient ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
-
-	/**
-	 * Low-level class for serialization of Avro values.
-	 */
-	private transient Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
-
-	/**
-	 * Creates a Avro serialization schema for the given schema.
-	 *
-	 * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
-	 */
-	public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz) {
-		Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
-		this.recordClazz = recordClazz;
-		this.schema = SpecificData.get().getSchema(recordClazz);
-		this.datumWriter = new SpecificDatumWriter<>(schema);
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public byte[] serialize(Row row) {
-		// convert to record
-		final Object record = convertToRecord(schema, row);
-
-		// write
-		try {
-			arrayOutputStream.reset();
-			datumWriter.write((GenericRecord) record, encoder);
-			encoder.flush();
-			return arrayOutputStream.toByteArray();
-		} catch (IOException e) {
-			throw new RuntimeException("Failed to serialize Row.", e);
-		}
-	}
-
-	private void writeObject(ObjectOutputStream oos) throws IOException {
-		oos.writeObject(recordClazz);
-	}
-
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
-		this.recordClazz = (Class<? extends SpecificRecord>) ois.readObject();
-		this.schema = SpecificData.get().getSchema(recordClazz);
-		this.datumWriter = new SpecificDatumWriter<>(schema);
-		this.arrayOutputStream = new ByteArrayOutputStream();
-		this.encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
-	}
-
-	/**
-	 * Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
-	 * Strings are converted into Avro's {@link Utf8} fields.
-	 */
-	private static Object convertToRecord(Schema schema, Object rowObj) {
-		if (rowObj instanceof Row) {
-			// records can be wrapped in a union
-			if (schema.getType() == Schema.Type.UNION) {
-				final List<Schema> types = schema.getTypes();
-				if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
-					schema = types.get(1);
-				}
-				else if (types.size() == 2 && types.get(0).getType() == Schema.Type.RECORD && types.get(1).getType() == Schema.Type.NULL) {
-					schema = types.get(0);
-				}
-				else {
-					throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD] or UNION[RECORD, NULL] Given: " + schema);
-				}
-			} else if (schema.getType() != Schema.Type.RECORD) {
-				throw new RuntimeException("Record type for row type expected. But is: " + schema);
-			}
-			final List<Schema.Field> fields = schema.getFields();
-			final GenericRecord record = new GenericData.Record(schema);
-			final Row row = (Row) rowObj;
-			for (int i = 0; i < fields.size(); i++) {
-				final Schema.Field field = fields.get(i);
-				record.put(field.pos(), convertToRecord(field.schema(), row.getField(i)));
-			}
-			return record;
-		} else if (rowObj instanceof String) {
-			return new Utf8((String) rowObj);
-		} else {
-			return rowObj;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
deleted file mode 100644
index 28f2ed3..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils;
-import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.AvroRowSerializationSchema;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.InstantiationUtil;
-
-import org.apache.avro.specific.SpecificRecord;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Test for the Avro serialization and deserialization schema.
- */
-public class AvroRowDeSerializationSchemaTest {
-
-	@Test
-	public void testSerializeDeserializeSimpleRow() throws IOException {
-		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
-
-		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
-		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
-
-		final byte[] bytes = serializationSchema.serialize(testData.f2);
-		final Row actual = deserializationSchema.deserialize(bytes);
-
-		assertEquals(testData.f2, actual);
-	}
-
-	@Test
-	public void testSerializeSimpleRowSeveralTimes() throws IOException {
-		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
-
-		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
-		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
-
-		serializationSchema.serialize(testData.f2);
-		serializationSchema.serialize(testData.f2);
-		final byte[] bytes = serializationSchema.serialize(testData.f2);
-		final Row actual = deserializationSchema.deserialize(bytes);
-
-		assertEquals(testData.f2, actual);
-	}
-
-	@Test
-	public void testDeserializeRowSeveralTimes() throws IOException {
-		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
-
-		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
-		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
-
-		final byte[] bytes = serializationSchema.serialize(testData.f2);
-		deserializationSchema.deserialize(bytes);
-		deserializationSchema.deserialize(bytes);
-		final Row actual = deserializationSchema.deserialize(bytes);
-
-		assertEquals(testData.f2, actual);
-	}
-
-	@Test
-	public void testSerializeDeserializeComplexRow() throws IOException {
-		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
-
-		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
-		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
-
-		final byte[] bytes = serializationSchema.serialize(testData.f2);
-		final Row actual = deserializationSchema.deserialize(bytes);
-
-		assertEquals(testData.f2, actual);
-	}
-
-	@Test
-	public void testSerializeComplexRowSeveralTimes() throws IOException {
-		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
-
-		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
-		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
-
-		serializationSchema.serialize(testData.f2);
-		serializationSchema.serialize(testData.f2);
-		final byte[] bytes = serializationSchema.serialize(testData.f2);
-		final Row actual = deserializationSchema.deserialize(bytes);
-
-		assertEquals(testData.f2, actual);
-	}
-
-	@Test
-	public void testDeserializeComplexRowSeveralTimes() throws IOException {
-		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
-
-		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
-		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
-
-		final byte[] bytes = serializationSchema.serialize(testData.f2);
-		deserializationSchema.deserialize(bytes);
-		deserializationSchema.deserialize(bytes);
-		final Row actual = deserializationSchema.deserialize(bytes);
-
-		assertEquals(testData.f2, actual);
-	}
-
-	@Test
-	public void testSerializability() throws IOException, ClassNotFoundException {
-		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
-
-		final AvroRowSerializationSchema serOrig = new AvroRowSerializationSchema(testData.f0);
-		final AvroRowDeserializationSchema deserOrig = new AvroRowDeserializationSchema(testData.f0);
-
-		byte[] serBytes = InstantiationUtil.serializeObject(serOrig);
-		byte[] deserBytes = InstantiationUtil.serializeObject(deserOrig);
-
-		AvroRowSerializationSchema serCopy =
-			InstantiationUtil.deserializeObject(serBytes, Thread.currentThread().getContextClassLoader());
-		AvroRowDeserializationSchema deserCopy =
-			InstantiationUtil.deserializeObject(deserBytes, Thread.currentThread().getContextClassLoader());
-
-		final byte[] bytes = serCopy.serialize(testData.f2);
-		deserCopy.deserialize(bytes);
-		deserCopy.deserialize(bytes);
-		final Row actual = deserCopy.deserialize(bytes);
-
-		assertEquals(testData.f2, actual);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
index def16b2..871a6f6 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils;
+import org.apache.flink.formats.avro.utils.AvroTestUtils;
 import org.apache.flink.table.api.Types;
 
 import org.apache.avro.Schema;

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java
deleted file mode 100644
index a41125a..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.io.avro.generated.Address;
-import org.apache.flink.api.io.avro.generated.Colors;
-import org.apache.flink.api.io.avro.generated.User;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.types.Row;
-
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.specific.SpecificRecord;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-
-/**
- * Utilities for creating Avro Schemas.
- */
-public final class AvroTestUtils {
-
-	private static final String NAMESPACE = "org.apache.flink.streaming.connectors.kafka";
-
-	/**
-	 * Creates a flat Avro Schema for testing.
-	 */
-	public static Schema createFlatAvroSchema(String[] fieldNames, TypeInformation[] fieldTypes) {
-		final SchemaBuilder.FieldAssembler<Schema> fieldAssembler = SchemaBuilder
-			.record("BasicAvroRecord")
-			.namespace(NAMESPACE)
-			.fields();
-
-		final Schema nullSchema = Schema.create(Schema.Type.NULL);
-
-		for (int i = 0; i < fieldNames.length; i++) {
-			Schema schema = ReflectData.get().getSchema(fieldTypes[i].getTypeClass());
-			Schema unionSchema = Schema.createUnion(Arrays.asList(nullSchema, schema));
-			fieldAssembler.name(fieldNames[i]).type(unionSchema).noDefault();
-		}
-
-		return fieldAssembler.endRecord();
-	}
-
-	/**
-	 * Tests a simple Avro data types without nesting.
-	 */
-	public static Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> getSimpleTestData() {
-		final Address addr = Address.newBuilder()
-			.setNum(42)
-			.setStreet("Main Street 42")
-			.setCity("Test City")
-			.setState("Test State")
-			.setZip("12345")
-			.build();
-
-		final Row rowAddr = new Row(5);
-		rowAddr.setField(0, 42);
-		rowAddr.setField(1, "Main Street 42");
-		rowAddr.setField(2, "Test City");
-		rowAddr.setField(3, "Test State");
-		rowAddr.setField(4, "12345");
-
-		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> t = new Tuple3<>();
-		t.f0 = Address.class;
-		t.f1 = addr;
-		t.f2 = rowAddr;
-
-		return t;
-	}
-
-	/**
-	 * Tests all Avro data types as well as nested types.
-	 */
-	public static Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> getComplexTestData() {
-		final Address addr = Address.newBuilder()
-			.setNum(42)
-			.setStreet("Main Street 42")
-			.setCity("Test City")
-			.setState("Test State")
-			.setZip("12345")
-			.build();
-
-		final Row rowAddr = new Row(5);
-		rowAddr.setField(0, 42);
-		rowAddr.setField(1, "Main Street 42");
-		rowAddr.setField(2, "Test City");
-		rowAddr.setField(3, "Test State");
-		rowAddr.setField(4, "12345");
-
-		final User user = User.newBuilder()
-			.setName("Charlie")
-			.setFavoriteNumber(null)
-			.setFavoriteColor("blue")
-			.setTypeLongTest(1337L)
-			.setTypeDoubleTest(1.337d)
-			.setTypeNullTest(null)
-			.setTypeBoolTest(false)
-			.setTypeArrayString(new ArrayList<CharSequence>())
-			.setTypeArrayBoolean(new ArrayList<Boolean>())
-			.setTypeNullableArray(null)
-			.setTypeEnum(Colors.RED)
-			.setTypeMap(new HashMap<CharSequence, Long>())
-			.setTypeFixed(null)
-			.setTypeUnion(null)
-			.setTypeNested(addr)
-			.build();
-
-		final Row rowUser = new Row(15);
-		rowUser.setField(0, "Charlie");
-		rowUser.setField(1, null);
-		rowUser.setField(2, "blue");
-		rowUser.setField(3, 1337L);
-		rowUser.setField(4, 1.337d);
-		rowUser.setField(5, null);
-		rowUser.setField(6, false);
-		rowUser.setField(7, new ArrayList<CharSequence>());
-		rowUser.setField(8, new ArrayList<Boolean>());
-		rowUser.setField(9, null);
-		rowUser.setField(10, Colors.RED);
-		rowUser.setField(11, new HashMap<CharSequence, Long>());
-		rowUser.setField(12, null);
-		rowUser.setField(13, null);
-		rowUser.setField(14, rowAddr);
-
-		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> t = new Tuple3<>();
-		t.f0 = User.class;
-		t.f1 = user;
-		t.f2 = rowUser;
-
-		return t;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 97c9f20..7468b67 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -36,7 +36,6 @@ under the License.
 	<packaging>pom</packaging>
 
 	<modules>
-		<module>flink-avro</module>
 		<module>flink-jdbc</module>
 		<module>flink-hadoop-compatibility</module>
 		<module>flink-hbase</module>

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index ae3f56e..0ca742c 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -80,12 +80,6 @@ under the License.
 			<!-- managed version -->
 		</dependency>
 
-		<!-- Avro is needed for the interoperability with Avro types for serialization -->
-		<dependency>
-			<groupId>org.apache.avro</groupId>
-			<artifactId>avro</artifactId>
-		</dependency>
-
 		<!-- We explicitly depend on snappy since connectors that require it load it through the system class loader -->
 		<dependency>
 			<groupId>org.xerial.snappy</groupId>
@@ -128,7 +122,7 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
-    </dependencies>
+	</dependencies>
 
 	<profiles>
 		<profile>
@@ -209,6 +203,7 @@ under the License.
 							<exclude>org.apache.flink.core.fs.FileSystem#isFlinkSupportedScheme(java.lang.String)</exclude>
 							<exclude>org.apache.flink.core.fs.FileSystem#setDefaultScheme(org.apache.flink.configuration.Configuration)</exclude>
 							<exclude>org.apache.flink.api.java.typeutils.WritableTypeInfo</exclude>
+							<exclude>org.apache.flink.api.java.typeutils.AvroTypeInfo</exclude>
 							<!-- Breaking changes between 1.1 and 1.2.
 							We ignore these changes because these are low-level, internal runtime configuration parameters -->
 							<exclude>org.apache.flink.configuration.ConfigConstants#DEFAULT_JOB_MANAGER_MAX_ATTEMPTS_HISTORY_SIZE</exclude>

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index fc66ccd..88d524e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -571,16 +571,24 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 	}
 
 	/**
-	 * Force Flink to use the AvroSerializer for POJOs.
+	 * Forces Flink to use the Apache Avro serializer for POJOs.
+	 *
+	 * <b>Important:</b> Make sure to include the <i>flink-avro</i> module.
 	 */
 	public void enableForceAvro() {
 		forceAvro = true;
 	}
 
+	/**
+	 * Disables the Apache Avro serializer as the forced serializer for POJOs.
+	 */
 	public void disableForceAvro() {
 		forceAvro = false;
 	}
 
+	/**
+	 * Returns whether the Apache Avro is the default serializer for POJOs.
+	 */
 	public boolean isForceAvroEnabled() {
 		return forceAvro;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
deleted file mode 100644
index 1356e53..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Special type information to generate a special AvroTypeInfo for Avro POJOs (implementing SpecificRecordBase, the typed Avro POJOs)
- *
- * Proceeding: It uses a regular pojo type analysis and replaces all {@code GenericType<CharSequence>}
- *     with a {@code GenericType<avro.Utf8>}.
- * All other types used by Avro are standard Java types.
- * Only strings are represented as CharSequence fields and represented as Utf8 classes at runtime.
- * CharSequence is not comparable. To make them nicely usable with field expressions, we replace them here
- * by generic type infos containing Utf8 classes (which are comparable),
- *
- * This class is checked by the AvroPojoTest.
- * @param <T>
- */
-@Public
-public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> {
-	@PublicEvolving
-	public AvroTypeInfo(Class<T> typeClass) {
-		super(typeClass, generateFieldsFromAvroSchema(typeClass));
-	}
-
-	private static <T extends SpecificRecordBase> List<PojoField> generateFieldsFromAvroSchema(Class<T> typeClass) {
-		PojoTypeExtractor pte = new PojoTypeExtractor();
-		ArrayList<Type> typeHierarchy = new ArrayList<>();
-		typeHierarchy.add(typeClass);
-		TypeInformation ti = pte.analyzePojo(typeClass, typeHierarchy, null, null, null);
-
-		if(!(ti instanceof PojoTypeInfo)) {
-			throw new IllegalStateException("Expecting type to be a PojoTypeInfo");
-		}
-		PojoTypeInfo pti =  (PojoTypeInfo) ti;
-		List<PojoField> newFields = new ArrayList<>(pti.getTotalFields());
-
-		for(int i = 0; i < pti.getArity(); i++) {
-			PojoField f = pti.getPojoFieldAt(i);
-			TypeInformation newType = f.getTypeInformation();
-			// check if type is a CharSequence
-			if(newType instanceof GenericTypeInfo) {
-				if((newType).getTypeClass().equals(CharSequence.class)) {
-					// replace the type by a org.apache.avro.util.Utf8
-					newType = new GenericTypeInfo(org.apache.avro.util.Utf8.class);
-				}
-			}
-			PojoField newField = new PojoField(f.getField(), newType);
-			newFields.add(newField);
-		}
-		return newFields;
-	}
-
-	private static class PojoTypeExtractor extends TypeExtractor {
-		private PojoTypeExtractor() {
-			super();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 8a4fbbe..b24f425 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -27,12 +27,13 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
 import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 
+import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Modifier;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -300,15 +301,32 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 
 	@Override
 	@PublicEvolving
+	@SuppressWarnings("unchecked")
 	public TypeSerializer<T> createSerializer(ExecutionConfig config) {
 		if(config.isForceKryoEnabled()) {
-			return new KryoSerializer<T>(getTypeClass(), config);
+			return new KryoSerializer<>(getTypeClass(), config);
 		}
+
 		if(config.isForceAvroEnabled()) {
-			return new AvroSerializer<T>(getTypeClass());
+			Class<?> clazz;
+			try {
+				clazz = Class.forName("org.apache.flink.formats.avro.typeutils.AvroSerializer");
+			} catch (ClassNotFoundException e) {
+				throw new RuntimeException("Could not load the AvroSerializer class. " +
+					"You may be missing the 'flink-avro' dependency.");
+			}
+
+			try {
+				Constructor<?> constructor = clazz.getConstructor(Class.class);
+				return (TypeSerializer<T>) constructor.newInstance(getTypeClass());
+			} catch (NoSuchMethodException | IllegalAccessException | InstantiationException e) {
+				throw new RuntimeException("Incompatible versions of the Avro classes found.");
+			} catch (InvocationTargetException e) {
+				throw new RuntimeException("Cannot create AvroSerializer.", e.getTargetException());
+			}
 		}
 
-		TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length ];
+		TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length];
 		Field[] reflectiveFields = new Field[fields.length];
 
 		for (int i = 0; i < fields.length; i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
index 41d260d..c5c2565 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
@@ -287,4 +287,39 @@ public class TypeExtractionUtils {
 			((TypeVariable<?>) t1).getName().equals(((TypeVariable<?>) t2).getName()) &&
 			((TypeVariable<?>) t1).getGenericDeclaration().equals(((TypeVariable<?>) t2).getGenericDeclaration());
 	}
+
+	/**
+	 * Traverses the type hierarchy of a type up until a certain stop class is found.
+	 *
+	 * @param t type for which a hierarchy need to be created
+	 * @return type of the immediate child of the stop class
+	 */
+	public static Type getTypeHierarchy(List<Type> typeHierarchy, Type t, Class<?> stopAtClass) {
+		while (!(isClassType(t) && typeToClass(t).equals(stopAtClass))) {
+			typeHierarchy.add(t);
+			t = typeToClass(t).getGenericSuperclass();
+
+			if (t == null) {
+				break;
+			}
+		}
+		return t;
+	}
+
+	/**
+	 * Returns true if the given class has a superclass of given name.
+	 *
+	 * @param clazz class to be analyzed
+	 * @param superClassName class name of the super class
+	 */
+	public static boolean hasSuperclass(Class<?> clazz, String superClassName) {
+		List<Type> hierarchy = new ArrayList<>();
+		getTypeHierarchy(hierarchy, clazz, Object.class);
+		for (Type t : hierarchy) {
+			if (isClassType(t) && typeToClass(t).getName().equals(superClassName)) {
+				return true;
+			}
+		}
+		return false;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index c50dfc9..1a9cecb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.java.typeutils;
 
-import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
@@ -73,6 +72,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getTypeHierarchy;
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.hasSuperclass;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
 import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods;
@@ -114,6 +115,10 @@ public class TypeExtractor {
 
 	private static final String HADOOP_WRITABLE_TYPEINFO_CLASS = "org.apache.flink.api.java.typeutils.WritableTypeInfo";
 
+	private static final String AVRO_SPECIFIC_RECORD_BASE_CLASS = "org.apache.avro.specific.SpecificRecordBase";
+
+	private static final String AVRO_TYPEINFO_CLASS = "org.apache.flink.formats.avro.typeutils.AvroTypeInfo";
+
 	private static final Logger LOG = LoggerFactory.getLogger(TypeExtractor.class);
 
 	public static final int[] NO_INDEX = new int[] {};
@@ -1583,24 +1588,6 @@ public class TypeExtractor {
 	}
 
 	/**
-	 * Traverses the type hierarchy of a type up until a certain stop class is found.
-	 *
-	 * @param t type for which a hierarchy need to be created
-	 * @return type of the immediate child of the stop class
-	 */
-	private static Type getTypeHierarchy(ArrayList<Type> typeHierarchy, Type t, Class<?> stopAtClass) {
-		while (!(isClassType(t) && typeToClass(t).equals(stopAtClass))) {
-			typeHierarchy.add(t);
-			t = typeToClass(t).getGenericSuperclass();
-
-			if (t == null) {
-				break;
-			}
-		}
-		return t;
-	}
-
-	/**
 	 * Traverses the type hierarchy up until a type information factory can be found.
 	 *
 	 * @param typeHierarchy hierarchy to be filled while traversing up
@@ -1806,8 +1793,8 @@ public class TypeExtractor {
 		}
 
 		// special case for POJOs generated by Avro.
-		if(SpecificRecordBase.class.isAssignableFrom(clazz)) {
-			return new AvroTypeInfo(clazz);
+		if (hasSuperclass(clazz, AVRO_SPECIFIC_RECORD_BASE_CLASS)) {
+			return createAvroTypeInfo(clazz);
 		}
 
 		if (Modifier.isInterface(clazz.getModifiers())) {
@@ -2119,7 +2106,7 @@ public class TypeExtractor {
 	private static boolean hasHadoopWritableInterface(Class<?> clazz,  HashSet<Class<?>> alreadySeen) {
 		Class<?>[] interfaces = clazz.getInterfaces();
 		for (Class<?> c : interfaces) {
-			if (c.getName().equals("org.apache.hadoop.io.Writable")) {
+			if (c.getName().equals(HADOOP_WRITABLE_CLASS)) {
 				return true;
 			}
 			else if (alreadySeen.add(c) && hasHadoopWritableInterface(c, alreadySeen)) {
@@ -2155,7 +2142,7 @@ public class TypeExtractor {
 			throw new RuntimeException("Incompatible versions of the Hadoop Compatibility classes found.");
 		}
 		catch (InvocationTargetException e) {
-			throw new RuntimeException("Cannot create Hadoop Writable Type info", e.getTargetException());
+			throw new RuntimeException("Cannot create Hadoop WritableTypeInfo.", e.getTargetException());
 		}
 	}
 
@@ -2171,7 +2158,7 @@ public class TypeExtractor {
 				// this is actually a writable type info
 				// check if the type is a writable
 				if (!(type instanceof Class && isHadoopWritable((Class<?>) type))) {
-					throw new InvalidTypesException(HADOOP_WRITABLE_CLASS + " type expected");
+					throw new InvalidTypesException(HADOOP_WRITABLE_CLASS + " type expected.");
 				}
 
 				// check writable type contents
@@ -2188,4 +2175,33 @@ public class TypeExtractor {
 			// ignore
 		}
 	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities to handle Avro's 'SpecificRecord' type via reflection
+	// ------------------------------------------------------------------------
+
+	private static <T> TypeInformation<T> createAvroTypeInfo(Class<T> clazz) {
+		Class<?> typeInfoClass;
+		try {
+			typeInfoClass = Class.forName(AVRO_TYPEINFO_CLASS, false, TypeExtractor.class.getClassLoader());
+		}
+		catch (ClassNotFoundException e) {
+			throw new RuntimeException("Could not load the TypeInformation for the class '"
+					+ AVRO_TYPEINFO_CLASS + "'. You may be missing the 'flink-avro' dependency.");
+		}
+
+		try {
+			Constructor<?> constr = typeInfoClass.getConstructor(Class.class);
+
+			@SuppressWarnings("unchecked")
+			TypeInformation<T> typeInfo = (TypeInformation<T>) constr.newInstance(clazz);
+			return typeInfo;
+		}
+		catch (NoSuchMethodException | IllegalAccessException | InstantiationException e) {
+			throw new RuntimeException("Incompatible versions of the Avro classes found.");
+		}
+		catch (InvocationTargetException e) {
+			throw new RuntimeException("Cannot create AvroTypeInfo.", e.getTargetException());
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
deleted file mode 100644
index 565bd4d..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.util.Utf8;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
-
-import com.esotericsoftware.kryo.Kryo;
-import org.apache.flink.util.Preconditions;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * General purpose serialization. Currently using Apache Avro's Reflect-serializers for serialization and
- * Kryo for deep object copies. We want to change this to Kryo-only.
- *
- * @param <T> The type serialized.
- */
-@Internal
-public final class AvroSerializer<T> extends TypeSerializer<T> {
-
-	private static final long serialVersionUID = 1L;
-	
-	private final Class<T> type;
-	
-	private final Class<? extends T> typeToInstantiate;
-
-	/**
-	 * Map of class tag (using classname as tag) to their Kryo registration.
-	 *
-	 * <p>This map serves as a preview of the final registration result of
-	 * the Kryo instance, taking into account registration overwrites.
-	 */
-	private LinkedHashMap<String, KryoRegistration> kryoRegistrations;
-	
-	private transient ReflectDatumWriter<T> writer;
-	private transient ReflectDatumReader<T> reader;
-	
-	private transient DataOutputEncoder encoder;
-	private transient DataInputDecoder decoder;
-	
-	private transient Kryo kryo;
-	
-	private transient T deepCopyInstance;
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public AvroSerializer(Class<T> type) {
-		this(type, type);
-	}
-	
-	public AvroSerializer(Class<T> type, Class<? extends T> typeToInstantiate) {
-		this.type = checkNotNull(type);
-		this.typeToInstantiate = checkNotNull(typeToInstantiate);
-		
-		InstantiationUtil.checkForInstantiation(typeToInstantiate);
-
-		this.kryoRegistrations = buildKryoRegistrations(type);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public AvroSerializer<T> duplicate() {
-		return new AvroSerializer<T>(type, typeToInstantiate);
-	}
-	
-	@Override
-	public T createInstance() {
-		return InstantiationUtil.instantiate(this.typeToInstantiate);
-	}
-
-	@Override
-	public T copy(T from) {
-		checkKryoInitialized();
-
-		return KryoUtils.copy(from, kryo, this);
-	}
-	
-	@Override
-	public T copy(T from, T reuse) {
-		checkKryoInitialized();
-
-		return KryoUtils.copy(from, reuse, kryo, this);
-	}
-
-	@Override
-	public int getLength() {
-		return -1;
-	}
-
-	@Override
-	public void serialize(T value, DataOutputView target) throws IOException {
-		checkAvroInitialized();
-		this.encoder.setOut(target);
-		this.writer.write(value, this.encoder);
-	}
-	
-	@Override
-	public T deserialize(DataInputView source) throws IOException {
-		checkAvroInitialized();
-		this.decoder.setIn(source);
-		return this.reader.read(null, this.decoder);
-	}
-
-	@Override
-	public T deserialize(T reuse, DataInputView source) throws IOException {
-		checkAvroInitialized();
-		this.decoder.setIn(source);
-		return this.reader.read(reuse, this.decoder);
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		checkAvroInitialized();
-		
-		if (this.deepCopyInstance == null) {
-			this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class);
-		}
-		
-		this.decoder.setIn(source);
-		this.encoder.setOut(target);
-		
-		T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
-		this.writer.write(tmp, this.encoder);
-	}
-	
-	
-	private void checkAvroInitialized() {
-		if (this.reader == null) {
-			this.reader = new ReflectDatumReader<T>(type);
-			this.writer = new ReflectDatumWriter<T>(type);
-			this.encoder = new DataOutputEncoder();
-			this.decoder = new DataInputDecoder();
-		}
-	}
-	
-	private void checkKryoInitialized() {
-		if (this.kryo == null) {
-			this.kryo = new Kryo();
-
-			Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
-			instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
-			kryo.setInstantiatorStrategy(instantiatorStrategy);
-
-			kryo.setAsmEnabled(true);
-
-			KryoUtils.applyRegistrations(kryo, kryoRegistrations.values());
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return 31 * this.type.hashCode() + this.typeToInstantiate.hashCode();
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof AvroSerializer) {
-			@SuppressWarnings("unchecked")
-			AvroSerializer<T> avroSerializer = (AvroSerializer<T>) obj;
-
-			return avroSerializer.canEqual(this) &&
-				type == avroSerializer.type &&
-				typeToInstantiate == avroSerializer.typeToInstantiate;
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof AvroSerializer;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Serializer configuration snapshotting & compatibility
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public AvroSerializerConfigSnapshot<T> snapshotConfiguration() {
-		return new AvroSerializerConfigSnapshot<>(type, typeToInstantiate, kryoRegistrations);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-		if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
-			final AvroSerializerConfigSnapshot<T> config = (AvroSerializerConfigSnapshot<T>) configSnapshot;
-
-			if (type.equals(config.getTypeClass()) && typeToInstantiate.equals(config.getTypeToInstantiate())) {
-				// resolve Kryo registrations; currently, since the Kryo registrations in Avro
-				// are fixed, there shouldn't be a problem with the resolution here.
-
-				LinkedHashMap<String, KryoRegistration> oldRegistrations = config.getKryoRegistrations();
-				oldRegistrations.putAll(kryoRegistrations);
-
-				for (Map.Entry<String, KryoRegistration> reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) {
-					if (reconfiguredRegistrationEntry.getValue().isDummy()) {
-						return CompatibilityResult.requiresMigration();
-					}
-				}
-
-				this.kryoRegistrations = oldRegistrations;
-				return CompatibilityResult.compatible();
-			}
-		}
-
-		// ends up here if the preceding serializer is not
-		// the ValueSerializer, or serialized data type has changed
-		return CompatibilityResult.requiresMigration();
-	}
-
-	public static class AvroSerializerConfigSnapshot<T> extends KryoRegistrationSerializerConfigSnapshot<T> {
-
-		private static final int VERSION = 1;
-
-		private Class<? extends T> typeToInstantiate;
-
-		public AvroSerializerConfigSnapshot() {}
-
-		public AvroSerializerConfigSnapshot(
-				Class<T> baseType,
-				Class<? extends T> typeToInstantiate,
-				LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
-
-			super(baseType, kryoRegistrations);
-			this.typeToInstantiate = Preconditions.checkNotNull(typeToInstantiate);
-		}
-
-		@Override
-		public void write(DataOutputView out) throws IOException {
-			super.write(out);
-
-			out.writeUTF(typeToInstantiate.getName());
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public void read(DataInputView in) throws IOException {
-			super.read(in);
-
-			String classname = in.readUTF();
-			try {
-				typeToInstantiate = (Class<? extends T>) Class.forName(classname, true, getUserCodeClassLoader());
-			} catch (ClassNotFoundException e) {
-				throw new IOException("Cannot find requested class " + classname + " in classpath.", e);
-			}
-		}
-
-		@Override
-		public int getVersion() {
-			return VERSION;
-		}
-
-		public Class<? extends T> getTypeToInstantiate() {
-			return typeToInstantiate;
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		in.defaultReadObject();
-
-		// kryoRegistrations may be null if this Avro serializer is deserialized from an old version
-		if (kryoRegistrations == null) {
-			this.kryoRegistrations = buildKryoRegistrations(type);
-		}
-	}
-
-	private static <T> LinkedHashMap<String, KryoRegistration> buildKryoRegistrations(Class<T> serializedDataType) {
-		final LinkedHashMap<String, KryoRegistration> registrations = new LinkedHashMap<>();
-
-		// register Avro types.
-		registrations.put(
-				GenericData.Array.class.getName(),
-				new KryoRegistration(
-						GenericData.Array.class,
-						new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList())));
-		registrations.put(Utf8.class.getName(), new KryoRegistration(Utf8.class));
-		registrations.put(GenericData.EnumSymbol.class.getName(), new KryoRegistration(GenericData.EnumSymbol.class));
-		registrations.put(GenericData.Fixed.class.getName(), new KryoRegistration(GenericData.Fixed.class));
-		registrations.put(GenericData.StringType.class.getName(), new KryoRegistration(GenericData.StringType.class));
-
-		// register the serialized data type
-		registrations.put(serializedDataType.getName(), new KryoRegistration(serializedDataType));
-
-		return registrations;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java
deleted file mode 100644
index c0454c6..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.avro.io.Decoder;
-import org.apache.avro.util.Utf8;
-import org.apache.flink.annotation.Internal;
-
-@Internal
-public class DataInputDecoder extends Decoder implements java.io.Serializable {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private transient Utf8 stringDecoder = new Utf8();
-	
-	
-	private transient DataInput in;
-	
-	
-	public void setIn(DataInput in) {
-		this.in = in;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// primitives
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void readNull() {}
-	
-
-	@Override
-	public boolean readBoolean() throws IOException {
-		return in.readBoolean();
-	}
-
-	@Override
-	public int readInt() throws IOException {
-		return in.readInt();
-	}
-
-	@Override
-	public long readLong() throws IOException {
-		return in.readLong();
-	}
-
-	@Override
-	public float readFloat() throws IOException {
-		return in.readFloat();
-	}
-
-	@Override
-	public double readDouble() throws IOException {
-		return in.readDouble();
-	}
-	
-	@Override
-	public int readEnum() throws IOException {
-		return readInt();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// bytes
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void readFixed(byte[] bytes, int start, int length) throws IOException {
-		in.readFully(bytes, start, length);
-	}
-	
-	@Override
-	public ByteBuffer readBytes(ByteBuffer old) throws IOException {
-		int length = readInt();
-		ByteBuffer result;
-		if (old != null && length <= old.capacity() && old.hasArray()) {
-			result = old;
-			result.clear();
-		} else {
-			result = ByteBuffer.allocate(length);
-		}
-		in.readFully(result.array(), result.arrayOffset() + result.position(), length);
-		result.limit(length);
-		return result;
-	}
-	
-	
-	@Override
-	public void skipFixed(int length) throws IOException {
-		skipBytes(length);
-	}
-	
-	@Override
-	public void skipBytes() throws IOException {
-		int num = readInt();
-		skipBytes(num);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// strings
-	// --------------------------------------------------------------------------------------------
-	
-	
-	@Override
-	public Utf8 readString(Utf8 old) throws IOException {
-		int length = readInt();
-		Utf8 result = (old != null ? old : new Utf8());
-		result.setByteLength(length);
-		
-		if (length > 0) {
-			in.readFully(result.getBytes(), 0, length);
-		}
-		
-		return result;
-	}
-
-	@Override
-	public String readString() throws IOException {
-		return readString(stringDecoder).toString();
-	}
-
-	@Override
-	public void skipString() throws IOException {
-		int len = readInt();
-		skipBytes(len);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// collection types
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public long readArrayStart() throws IOException {
-		return readVarLongCount(in);
-	}
-
-	@Override
-	public long arrayNext() throws IOException {
-		return readVarLongCount(in);
-	}
-
-	@Override
-	public long skipArray() throws IOException {
-		return readVarLongCount(in);
-	}
-
-	@Override
-	public long readMapStart() throws IOException {
-		return readVarLongCount(in);
-	}
-
-	@Override
-	public long mapNext() throws IOException {
-		return readVarLongCount(in);
-	}
-
-	@Override
-	public long skipMap() throws IOException {
-		return readVarLongCount(in);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// union
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public int readIndex() throws IOException {
-		return readInt();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// utils
-	// --------------------------------------------------------------------------------------------
-	
-	private void skipBytes(int num) throws IOException {
-		while (num > 0) {
-			num -= in.skipBytes(num);
-		}
-	}
-	
-	public static long readVarLongCount(DataInput in) throws IOException {
-		long value = in.readUnsignedByte();
-
-		if ((value & 0x80) == 0) {
-			return value;
-		}
-		else {
-			long curr;
-			int shift = 7;
-			value = value & 0x7f;
-			while (((curr = in.readUnsignedByte()) & 0x80) != 0){
-				value |= (curr & 0x7f) << shift;
-				shift += 7;
-			}
-			value |= curr << shift;
-			return value;
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// serialization
-	// --------------------------------------------------------------------------------------------
-	
-	private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException {
-		// Read in size, and any hidden stuff
-		s.defaultReadObject();
-		
-		this.stringDecoder = new Utf8();
-		this.in = null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java
deleted file mode 100644
index c41b648..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.avro.io.Encoder;
-import org.apache.avro.util.Utf8;
-import org.apache.flink.annotation.Internal;
-
-@Internal
-public final class DataOutputEncoder extends Encoder implements java.io.Serializable {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private transient DataOutput out;
-	
-	
-	public void setOut(DataOutput out) {
-		this.out = out;
-	}
-
-
-	@Override
-	public void flush() throws IOException {}
-
-	// --------------------------------------------------------------------------------------------
-	// primitives
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void writeNull() {}
-	
-
-	@Override
-	public void writeBoolean(boolean b) throws IOException {
-		out.writeBoolean(b);
-	}
-
-	@Override
-	public void writeInt(int n) throws IOException {
-		out.writeInt(n);
-	}
-
-	@Override
-	public void writeLong(long n) throws IOException {
-		out.writeLong(n);
-	}
-
-	@Override
-	public void writeFloat(float f) throws IOException {
-		out.writeFloat(f);
-	}
-
-	@Override
-	public void writeDouble(double d) throws IOException {
-		out.writeDouble(d);
-	}
-	
-	@Override
-	public void writeEnum(int e) throws IOException {
-		out.writeInt(e);
-	}
-	
-	
-	// --------------------------------------------------------------------------------------------
-	// bytes
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void writeFixed(byte[] bytes, int start, int len) throws IOException {
-		out.write(bytes, start, len);
-	}
-	
-	@Override
-	public void writeBytes(byte[] bytes, int start, int len) throws IOException {
-		out.writeInt(len);
-		if (len > 0) {
-			out.write(bytes, start, len);
-		}
-	}
-	
-	@Override
-	public void writeBytes(ByteBuffer bytes) throws IOException {
-		int num = bytes.remaining();
-		out.writeInt(num);
-		
-		if (num > 0) {
-			writeFixed(bytes);
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// strings
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void writeString(String str) throws IOException {
-		byte[] bytes = Utf8.getBytesFor(str);
-		writeBytes(bytes, 0, bytes.length);
-	}
-	
-	@Override
-	public void writeString(Utf8 utf8) throws IOException {
-		writeBytes(utf8.getBytes(), 0, utf8.getByteLength());
-		
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// collection types
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void writeArrayStart() {}
-
-	@Override
-	public void setItemCount(long itemCount) throws IOException {
-		if (itemCount > 0) {
-			writeVarLongCount(out, itemCount);
-		}
-	}
-
-	@Override
-	public void startItem() {}
-
-	@Override
-	public void writeArrayEnd() throws IOException {
-		// write a single byte 0, shortcut for a var-length long of 0
-		out.write(0);
-	}
-
-	@Override
-	public void writeMapStart() {}
-
-	@Override
-	public void writeMapEnd() throws IOException {
-		// write a single byte 0, shortcut for a var-length long of 0
-		out.write(0);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// union
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void writeIndex(int unionIndex) throws IOException {
-		out.writeInt(unionIndex);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// utils
-	// --------------------------------------------------------------------------------------------
-		
-	
-	public static final void writeVarLongCount(DataOutput out, long val) throws IOException {
-		if (val < 0) {
-			throw new IOException("Illegal count (must be non-negative): " + val);
-		}
-		
-		while ((val & ~0x7FL) != 0) {
-			out.write(((int) val) | 0x80);
-			val >>>= 7;
-		}
-		out.write((int) val);
-	}
-	
-	private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException {
-		// Read in size, and any hidden stuff
-		s.defaultReadObject();
-
-		this.out = null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index 6730136..269cf35 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -24,8 +24,6 @@ import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 
-import org.apache.avro.generic.GenericData;
-
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
@@ -406,7 +404,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 				}
 
 				// there's actually no way to tell if new Kryo serializers are compatible with
-				// the previous ones they overwrite; we can only signal compatibly and hope for the best
+				// the previous ones they overwrite; we can only signal compatibility and hope for the best
 				this.kryoRegistrations = reconfiguredRegistrations;
 				return CompatibilityResult.compatible();
 			}
@@ -478,11 +476,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 							registeredTypeWithSerializerEntry.getValue()));
 		}
 
-		kryoRegistrations.put(
-				GenericData.Array.class.getName(),
-				new KryoRegistration(
-						GenericData.Array.class,
-						new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList())));
+		// add Avro support if flink-avro is available; a dummy otherwise
+		Serializers.addAvroGenericDataArrayRegistration(kryoRegistrations);
 
 		return kryoRegistrations;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
index 4976d6a..de7b2fc 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
@@ -18,16 +18,6 @@
 
 package org.apache.flink.api.java.typeutils.runtime.kryo;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.serializers.CollectionSerializer;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.specific.SpecificRecordBase;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -35,18 +25,29 @@ import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractionUtils;
+import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.CollectionSerializer;
 
 import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.lang.reflect.GenericArrayType;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Modifier;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Set;
 
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.hasSuperclass;
+
 
 /**
  * Class containing utilities for the serializers of the Flink Runtime.
@@ -60,6 +61,14 @@ import java.util.Set;
 @Internal
 public class Serializers {
 
+	private static final String AVRO_SPECIFIC_RECORD_BASE = "org.apache.avro.specific.SpecificRecordBase";
+
+	private static final String AVRO_GENERIC_RECORD = "org.apache.avro.generic.GenericData$Record";
+
+	private static final String AVRO_KRYO_UTILS = "org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils";
+
+	private static final String AVRO_GENERIC_DATA_ARRAY = "org.apache.avro.generic.GenericData$Array";
+
 	public static void recursivelyRegisterType(TypeInformation<?> typeInfo, ExecutionConfig config, Set<Class<?>> alreadySeen) {
 		if (typeInfo instanceof GenericTypeInfo) {
 			GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) typeInfo;
@@ -94,8 +103,11 @@ public class Serializers {
 		}
 		else {
 			config.registerKryoType(type);
-			checkAndAddSerializerForTypeAvro(config, type);
-	
+			// add serializers for Avro type if necessary
+			if (hasSuperclass(type, AVRO_SPECIFIC_RECORD_BASE) || hasSuperclass(type, AVRO_GENERIC_RECORD)) {
+				addAvroSerializers(config, type);
+			}
+
 			Field[] fields = type.getDeclaredFields();
 			for (Field field : fields) {
 				if (Modifier.isStatic(field.getModifiers()) || Modifier.isTransient(field.getModifiers())) {
@@ -147,20 +159,54 @@ public class Serializers {
 			}
 		}
 	}
-	
-	// ------------------------------------------------------------------------
-	
-	private static void checkAndAddSerializerForTypeAvro(ExecutionConfig reg, Class<?> type) {
-		if (GenericData.Record.class.isAssignableFrom(type) || SpecificRecordBase.class.isAssignableFrom(type)) {
-			// Avro POJOs contain java.util.List which have GenericData.Array as their runtime type
-			// because Kryo is not able to serialize them properly, we use this serializer for them
-			reg.registerTypeWithKryoSerializer(GenericData.Array.class, SpecificInstanceCollectionSerializerForArrayList.class);
-
-			// We register this serializer for users who want to use untyped Avro records (GenericData.Record).
-			// Kryo is able to serialize everything in there, except for the Schema.
-			// This serializer is very slow, but using the GenericData.Records of Kryo is in general a bad idea.
-			// we add the serializer as a default serializer because Avro is using a private sub-type at runtime.
-			reg.addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class);
+
+	/**
+	 * Loads the utility class from <code>flink-avro</code> and adds Avro-specific serializers.
+	 */
+	private static void addAvroSerializers(ExecutionConfig reg, Class<?> type) {
+		Class<?> clazz;
+		try {
+			clazz = Class.forName(AVRO_KRYO_UTILS, false, Serializers.class.getClassLoader());
+		}
+		catch (ClassNotFoundException e) {
+			throw new RuntimeException("Could not load class '" + AVRO_KRYO_UTILS + "'. " +
+				"You may be missing the 'flink-avro' dependency.");
+		}
+		try {
+			clazz.getDeclaredMethod("addAvroSerializers", ExecutionConfig.class, Class.class).invoke(null, reg, type);
+		} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+			throw new RuntimeException("Could not access method in 'flink-avro' dependency.", e);
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	public static void addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
+		try {
+			Class<?> clazz = Class.forName(AVRO_GENERIC_DATA_ARRAY, false, Serializers.class.getClassLoader());
+
+			kryoRegistrations.put(
+				AVRO_GENERIC_DATA_ARRAY,
+				new KryoRegistration(
+						clazz,
+						new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList())));
+		}
+		catch (ClassNotFoundException e) {
+			kryoRegistrations.put(AVRO_GENERIC_DATA_ARRAY,
+				new KryoRegistration(DummyAvroRegisteredClass.class, (Class) DummyAvroKryoSerializerClass.class));
+		}
+	}
+
+	public static class DummyAvroRegisteredClass {}
+
+	public static class DummyAvroKryoSerializerClass<T> extends Serializer<T> {
+		@Override
+		public void write(Kryo kryo, Output output, Object o) {
+			throw new UnsupportedOperationException("Could not find required Avro dependency.");
+		}
+
+		@Override
+		public T read(Kryo kryo, Input input, Class<T> aClass) {
+			throw new UnsupportedOperationException("Could not find required Avro dependency.");
 		}
 	}
 
@@ -168,6 +214,9 @@ public class Serializers {
 	// Custom Serializers
 	// --------------------------------------------------------------------------------------------
 
+	/**
+	 * Special serializer for Java's {@link ArrayList} used for Avro's GenericData.Array.
+	 */
 	@SuppressWarnings("rawtypes")
 	public static class SpecificInstanceCollectionSerializerForArrayList extends SpecificInstanceCollectionSerializer<ArrayList> {
 		private static final long serialVersionUID = 1L;
@@ -176,19 +225,19 @@ public class Serializers {
 			super(ArrayList.class);
 		}
 	}
+
 	/**
 	 * Special serializer for Java collections enforcing certain instance types.
 	 * Avro is serializing collections with an "GenericData.Array" type. Kryo is not able to handle
 	 * this type, so we use ArrayLists.
 	 */
 	@SuppressWarnings("rawtypes")
-	public static class SpecificInstanceCollectionSerializer<T extends Collection> 
-			extends CollectionSerializer implements Serializable
-	{
+	public static class SpecificInstanceCollectionSerializer<T extends Collection>
+			extends CollectionSerializer implements Serializable {
 		private static final long serialVersionUID = 1L;
-		
+
 		private Class<T> type;
-		
+
 		public SpecificInstanceCollectionSerializer(Class<T> type) {
 			this.type = type;
 		}
@@ -203,27 +252,4 @@ public class Serializers {
 			return kryo.newInstance(this.type);
 		}
 	}
-
-	/**
-	 * Slow serialization approach for Avro schemas.
-	 * This is only used with {{@link org.apache.avro.generic.GenericData.Record}} types.
-	 * Having this serializer, we are able to handle avro Records.
-	 */
-	public static class AvroSchemaSerializer extends Serializer<Schema> implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void write(Kryo kryo, Output output, Schema object) {
-			String schemaAsString = object.toString(false);
-			output.writeString(schemaAsString);
-		}
-
-		@Override
-		public Schema read(Kryo kryo, Input input, Class<Schema> type) {
-			String schemaAsString = input.readString();
-			// the parser seems to be stateful, to we need a new one for every type.
-			Schema.Parser sParser = new Schema.Parser();
-			return sParser.parse(schemaAsString);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericArraySerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericArraySerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericArraySerializerTest.java
deleted file mode 100644
index 5b08e52..0000000
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericArraySerializerTest.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-public class AvroGenericArraySerializerTest extends AbstractGenericArraySerializerTest {
-	@Override
-	protected <T> TypeSerializer<T> createComponentSerializer(Class<T> type) {
-		return new AvroSerializer<T>(type);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java
deleted file mode 100644
index 19fac43..0000000
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-public class AvroGenericTypeComparatorTest extends AbstractGenericTypeComparatorTest {
-	@Override
-	protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
-		return new AvroSerializer<T>(type);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java
deleted file mode 100644
index df1ff60..0000000
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-public class AvroGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest {
-
-	@Override
-	protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
-		return new AvroSerializer<T>(type);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java
deleted file mode 100644
index 8a89410..0000000
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.avro.reflect.Nullable;
-import org.apache.flink.api.common.typeutils.SerializerTestInstance;
-import org.junit.Test;
-
-public class AvroSerializerEmptyArrayTest {
-
-	@Test
-	public void testBookSerialization() {
-		try {
-			Book b = new Book(123, "This is a test book", 26382648);
-			AvroSerializer<Book> serializer = new AvroSerializer<Book>(Book.class);
-			SerializerTestInstance<Book> test = new SerializerTestInstance<Book>(serializer, Book.class, -1, b);
-			test.testAll();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testSerialization() {
-		try {
-			List<String> titles = new ArrayList<String>();
-
-			List<Book> books = new ArrayList<Book>();
-			books.add(new Book(123, "This is a test book", 1));
-			books.add(new Book(24234234, "This is a test book", 1));
-			books.add(new Book(1234324, "This is a test book", 3));
-
-			BookAuthor a = new BookAuthor(1, titles, "Test Author");
-			a.books = books;
-			a.bookType = BookAuthor.BookType.journal;
-			
-			AvroSerializer<BookAuthor> serializer = new AvroSerializer<BookAuthor>(BookAuthor.class);
-			
-			SerializerTestInstance<BookAuthor> test = new SerializerTestInstance<BookAuthor>(serializer, BookAuthor.class, -1, a);
-			test.testAll();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	public static class Book {
-
-		long bookId;
-		@Nullable
-		String title;
-		long authorId;
-
-		public Book() {}
-
-		public Book(long bookId, String title, long authorId) {
-			this.bookId = bookId;
-			this.title = title;
-			this.authorId = authorId;
-		}
-
-		@Override
-		public int hashCode() {
-			final int prime = 31;
-			int result = 1;
-			result = prime * result + (int) (authorId ^ (authorId >>> 32));
-			result = prime * result + (int) (bookId ^ (bookId >>> 32));
-			result = prime * result + ((title == null) ? 0 : title.hashCode());
-			return result;
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (this == obj)
-				return true;
-			if (obj == null)
-				return false;
-			if (getClass() != obj.getClass())
-				return false;
-			Book other = (Book) obj;
-			if (authorId != other.authorId)
-				return false;
-			if (bookId != other.bookId)
-				return false;
-			if (title == null) {
-				if (other.title != null)
-					return false;
-			} else if (!title.equals(other.title))
-				return false;
-			return true;
-		}
-	}
-
-	public static class BookAuthor {
-
-		enum BookType {
-			book,
-			article,
-			journal
-		}
-
-		long authorId;
-
-		@Nullable
-		List<String> bookTitles;
-
-		@Nullable
-		List<Book> books;
-
-		String authorName;
-
-		BookType bookType;
-
-		public BookAuthor() {}
-
-		public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
-			this.authorId = authorId;
-			this.bookTitles = bookTitles;
-			this.authorName = authorName;
-		}
-
-		@Override
-		public int hashCode() {
-			final int prime = 31;
-			int result = 1;
-			result = prime * result + (int) (authorId ^ (authorId >>> 32));
-			result = prime * result + ((authorName == null) ? 0 : authorName.hashCode());
-			result = prime * result + ((bookTitles == null) ? 0 : bookTitles.hashCode());
-			result = prime * result + ((bookType == null) ? 0 : bookType.hashCode());
-			result = prime * result + ((books == null) ? 0 : books.hashCode());
-			return result;
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (this == obj)
-				return true;
-			if (obj == null)
-				return false;
-			if (getClass() != obj.getClass())
-				return false;
-			BookAuthor other = (BookAuthor) obj;
-			if (authorId != other.authorId)
-				return false;
-			if (authorName == null) {
-				if (other.authorName != null)
-					return false;
-			} else if (!authorName.equals(other.authorName))
-				return false;
-			if (bookTitles == null) {
-				if (other.bookTitles != null)
-					return false;
-			} else if (!bookTitles.equals(other.bookTitles))
-				return false;
-			if (bookType != other.bookType)
-				return false;
-			if (books == null) {
-				if (other.books != null)
-					return false;
-			} else if (!books.equals(other.books))
-				return false;
-			return true;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
index 5a404bd..1cacc9e 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
@@ -18,20 +18,22 @@
 
 package org.apache.flink.api.java.typeutils.runtime.kryo;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -42,6 +44,20 @@ import static org.junit.Assert.assertTrue;
  */
 public class KryoSerializerCompatibilityTest {
 
+	@Test
+	public void testMigrationStrategyForRemovedAvroDependency() throws Exception {
+		KryoSerializer<TestClass> kryoSerializerForA = new KryoSerializer<>(TestClass.class, new ExecutionConfig());
+
+		// read configuration again from bytes
+		TypeSerializerConfigSnapshot kryoSerializerConfigSnapshot;
+		try (InputStream in = getClass().getResourceAsStream("/kryo-serializer-flink1.3-snapshot")) {
+			kryoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
+				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+		}
+		CompatibilityResult<TestClass> compatResult = kryoSerializerForA.ensureCompatibility(kryoSerializerConfigSnapshot);
+		assertFalse(compatResult.isRequiresMigration());
+	}
+
 	/**
 	 * Verifies that reconfiguration result is INCOMPATIBLE if data type has changed.
 	 */
@@ -60,7 +76,7 @@ public class KryoSerializerCompatibilityTest {
 		KryoSerializer<TestClassB> kryoSerializerForB = new KryoSerializer<>(TestClassB.class, new ExecutionConfig());
 
 		// read configuration again from bytes
-		try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+		try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
 			kryoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
 				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
 		}
@@ -103,7 +119,7 @@ public class KryoSerializerCompatibilityTest {
 		kryoSerializer = new KryoSerializer<>(TestClass.class, executionConfig);
 
 		// read configuration from bytes
-		try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+		try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
 			kryoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
 				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/test/resources/kryo-serializer-flink1.3-snapshot
----------------------------------------------------------------------
diff --git a/flink-core/src/test/resources/kryo-serializer-flink1.3-snapshot b/flink-core/src/test/resources/kryo-serializer-flink1.3-snapshot
new file mode 100644
index 0000000..0123a9c
Binary files /dev/null and b/flink-core/src/test/resources/kryo-serializer-flink1.3-snapshot differ