You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/11/03 17:11:31 UTC

[08/21] flink git commit: [FLINK-7420] [avro] Move all Avro code to flink-avro

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
index d916116..048e7ac 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
@@ -23,12 +23,11 @@ import java.util.StringTokenizer
 import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 import org.apache.flink.streaming.connectors.twitter.TwitterSource
 import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
 import org.apache.flink.util.Collector
-import org.codehaus.jackson.JsonNode
-import org.codehaus.jackson.map.ObjectMapper
 
 import scala.collection.mutable.ListBuffer
 

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/pom.xml b/flink-formats/flink-avro/pom.xml
new file mode 100644
index 0000000..19d9129
--- /dev/null
+++ b/flink-formats/flink-avro/pom.xml
@@ -0,0 +1,280 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-formats</artifactId>
+		<version>1.4-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-avro_${scala.binary.version}</artifactId>
+	<name>flink-avro</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+
+		<!-- core dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.avro</groupId>
+			<artifactId>avro</artifactId>
+			<version>1.8.2</version>
+		</dependency>
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils-junit</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>create-test-dependency</id>
+						<phase>process-test-classes</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+						<configuration>
+							<archive>
+								<manifest>
+									<mainClass>org.apache.flink.formats.avro.testjar.AvroExternalJarProgram</mainClass>
+								</manifest>
+							</archive>
+							<finalName>maven</finalName>
+							<attach>false</attach>
+							<descriptors>
+								<descriptor>src/test/assembly/test-assembly.xml</descriptor>
+							</descriptors>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<!--Remove the AvroExternalJarProgram code from the test-classes directory since it musn't be in the
+			classpath when running the tests to actually test whether the user code class loader
+			is properly used.-->
+			<plugin>
+				<artifactId>maven-clean-plugin</artifactId>
+				<version>2.5</version><!--$NO-MVN-MAN-VER$-->
+				<executions>
+					<execution>
+						<id>remove-avroexternalprogram</id>
+						<phase>process-test-classes</phase>
+						<goals>
+							<goal>clean</goal>
+						</goals>
+						<configuration>
+							<excludeDefaultDirectories>true</excludeDefaultDirectories>
+							<filesets>
+								<fileset>
+									<directory>${project.build.testOutputDirectory}</directory>
+									<includes>
+										<include>**/testjar/*.class</include>
+									</includes>
+								</fileset>
+							</filesets>
+						</configuration>
+					</execution>
+				</executions>
+				<configuration>
+					<filesets>
+						<fileset>
+							<directory>${project.basedir}/src/test/java/org/apache/flink/formats/avro/generated</directory>
+						</fileset>
+					</filesets>
+				</configuration>
+			</plugin>
+			<!-- Generate Test class from avro schema -->
+			<plugin>
+				<groupId>org.apache.avro</groupId>
+				<artifactId>avro-maven-plugin</artifactId>
+				<version>1.8.2</version>
+				<executions>
+					<execution>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>schema</goal>
+						</goals>
+						<configuration>
+							<testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory>
+							<testOutputDirectory>${project.basedir}/src/test/java/</testOutputDirectory>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!-- Add Avro test classes to test jar in order to test AvroTypeInfo. -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>shade-flink</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration combine.self="override">
+							<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
+							<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+							<artifactSet>
+								<includes>
+									<include>org.codehaus.jackson:*</include>
+								</includes>
+							</artifactSet>
+							<relocations>
+								<relocation>
+									<pattern>org.codehaus.jackson</pattern>
+									<shadedPattern>org.apache.flink.avro.shaded.org.codehouse.jackson</shadedPattern>
+								</relocation>
+							</relocations>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+
+		<pluginManagement>
+			<plugins>
+				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+				<plugin>
+					<groupId>org.eclipse.m2e</groupId>
+					<artifactId>lifecycle-mapping</artifactId>
+					<version>1.0.0</version>
+					<configuration>
+						<lifecycleMappingMetadata>
+							<pluginExecutions>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-assembly-plugin</artifactId>
+										<versionRange>[2.4,)</versionRange>
+										<goals>
+											<goal>single</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore/>
+									</action>
+								</pluginExecution>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-clean-plugin</artifactId>
+										<versionRange>[1,)</versionRange>
+										<goals>
+											<goal>clean</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore/>
+									</action>
+								</pluginExecution>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.avro</groupId>
+										<artifactId>avro-maven-plugin</artifactId>
+										<versionRange>[1.7.7,)</versionRange>
+										<goals>
+											<goal>schema</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore/>
+									</action>
+								</pluginExecution>
+							</pluginExecutions>
+						</lifecycleMappingMetadata>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
new file mode 100644
index 0000000..58085f6
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.Public;
+
+import org.apache.avro.specific.SpecificRecordBase;
+
+import static org.apache.flink.formats.avro.typeutils.AvroTypeInfo.generateFieldsFromAvroSchema;
+
+/**
+ * @deprecated Please use <code>org.apache.flink.formats.avro.typeutils.AvroTypeInfo</code>
+ * in the <code>flink-avro</code> module. This class will be removed in the near future.
+ */
+@Deprecated
+@Public
+public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> {
+
+	public AvroTypeInfo(Class<T> typeClass) {
+		super(typeClass, generateFieldsFromAvroSchema(typeClass));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroInputFormat.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroInputFormat.java
new file mode 100644
index 0000000..9b73ceb
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroInputFormat.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.avro.utils.FSDataInputStreamWrapper;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Provides a {@link FileInputFormat} for Avro records.
+ *
+ * @param <E>
+ *            the type of the result Avro record. If you specify
+ *            {@link GenericRecord} then the result will be returned as a
+ *            {@link GenericRecord}, so you do not have to know the schema ahead
+ *            of time.
+ */
+public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E>,
+	CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(AvroInputFormat.class);
+
+	private final Class<E> avroValueType;
+
+	private boolean reuseAvroValue = true;
+
+	private transient DataFileReader<E> dataFileReader;
+
+	private transient long end;
+
+	private transient long recordsReadSinceLastSync;
+
+	private long lastSync = -1L;
+
+	public AvroInputFormat(Path filePath, Class<E> type) {
+		super(filePath);
+		this.avroValueType = type;
+	}
+
+	/**
+	 * Sets the flag whether to reuse the Avro value instance for all records.
+	 * By default, the input format reuses the Avro value.
+	 *
+	 * @param reuseAvroValue True, if the input format should reuse the Avro value instance, false otherwise.
+	 */
+	public void setReuseAvroValue(boolean reuseAvroValue) {
+		this.reuseAvroValue = reuseAvroValue;
+	}
+
+	/**
+	 * If set, the InputFormat will only read entire files.
+	 */
+	public void setUnsplittable(boolean unsplittable) {
+		this.unsplittable = unsplittable;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Typing
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public TypeInformation<E> getProducedType() {
+		return TypeExtractor.getForClass(this.avroValueType);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Input Format Methods
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void open(FileInputSplit split) throws IOException {
+		super.open(split);
+		dataFileReader = initReader(split);
+		dataFileReader.sync(split.getStart());
+		lastSync = dataFileReader.previousSync();
+	}
+
+	private DataFileReader<E> initReader(FileInputSplit split) throws IOException {
+		DatumReader<E> datumReader;
+
+		if (org.apache.avro.generic.GenericRecord.class == avroValueType) {
+			datumReader = new GenericDatumReader<E>();
+		} else {
+			datumReader = org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)
+				? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType);
+		}
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Opening split {}", split);
+		}
+
+		SeekableInput in = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
+		DataFileReader<E> dataFileReader = (DataFileReader) DataFileReader.openReader(in, datumReader);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Loaded SCHEMA: {}", dataFileReader.getSchema());
+		}
+
+		end = split.getStart() + split.getLength();
+		recordsReadSinceLastSync = 0;
+		return dataFileReader;
+	}
+
+	@Override
+	public boolean reachedEnd() throws IOException {
+		return !dataFileReader.hasNext() || dataFileReader.pastSync(end);
+	}
+
+	public long getRecordsReadFromBlock() {
+		return this.recordsReadSinceLastSync;
+	}
+
+	@Override
+	public E nextRecord(E reuseValue) throws IOException {
+		if (reachedEnd()) {
+			return null;
+		}
+
+		// if we start a new block, then register the event, and
+		// restart the counter.
+		if (dataFileReader.previousSync() != lastSync) {
+			lastSync = dataFileReader.previousSync();
+			recordsReadSinceLastSync = 0;
+		}
+		recordsReadSinceLastSync++;
+
+		if (reuseAvroValue) {
+			return dataFileReader.next(reuseValue);
+		} else {
+			if (GenericRecord.class == avroValueType) {
+				return dataFileReader.next();
+			} else {
+				return dataFileReader.next(InstantiationUtil.instantiate(avroValueType, Object.class));
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Checkpointing
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public Tuple2<Long, Long> getCurrentState() throws IOException {
+		return new Tuple2<>(this.lastSync, this.recordsReadSinceLastSync);
+	}
+
+	@Override
+	public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException {
+		Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
+		Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");
+
+		try {
+			this.open(split);
+		} finally {
+			if (state.f0 != -1) {
+				lastSync = state.f0;
+				recordsReadSinceLastSync = state.f1;
+			}
+		}
+
+		if (lastSync != -1) {
+			// open and read until the record we were before
+			// the checkpoint and discard the values
+			dataFileReader.seek(lastSync);
+			for (int i = 0; i < recordsReadSinceLastSync; i++) {
+				dataFileReader.next(null);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroOutputFormat.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroOutputFormat.java
new file mode 100644
index 0000000..c0b8073
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroOutputFormat.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.fs.Path;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link FileOutputFormat} for Avro records.
+ * @param <E>
+ */
+public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable {
+
+	/**
+	 * Wrapper which encapsulates the supported codec and a related serialization byte.
+	 */
+	public enum Codec {
+
+		NULL((byte) 0, CodecFactory.nullCodec()),
+		SNAPPY((byte) 1, CodecFactory.snappyCodec()),
+		BZIP2((byte) 2, CodecFactory.bzip2Codec()),
+		DEFLATE((byte) 3, CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL)),
+		XZ((byte) 4, CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL));
+
+		private byte codecByte;
+
+		private CodecFactory codecFactory;
+
+		Codec(final byte codecByte, final CodecFactory codecFactory) {
+			this.codecByte = codecByte;
+			this.codecFactory = codecFactory;
+		}
+
+		private byte getCodecByte() {
+			return codecByte;
+		}
+
+		private CodecFactory getCodecFactory() {
+			return codecFactory;
+		}
+
+		private static Codec forCodecByte(byte codecByte) {
+			for (final Codec codec : Codec.values()) {
+				if (codec.getCodecByte() == codecByte) {
+					return codec;
+				}
+			}
+			throw new IllegalArgumentException("no codec for codecByte: " + codecByte);
+		}
+	}
+
+	private static final long serialVersionUID = 1L;
+
+	private final Class<E> avroValueType;
+
+	private transient Schema userDefinedSchema = null;
+
+	private transient Codec codec = null;
+
+	private transient DataFileWriter<E> dataFileWriter;
+
+	public AvroOutputFormat(Path filePath, Class<E> type) {
+		super(filePath);
+		this.avroValueType = type;
+	}
+
+	public AvroOutputFormat(Class<E> type) {
+		this.avroValueType = type;
+	}
+
+	@Override
+	protected String getDirectoryFileName(int taskNumber) {
+		return super.getDirectoryFileName(taskNumber) + ".avro";
+	}
+
+	public void setSchema(Schema schema) {
+		this.userDefinedSchema = schema;
+	}
+
+	/**
+	 * Set avro codec for compression.
+	 *
+	 * @param codec avro codec.
+	 */
+	public void setCodec(final Codec codec) {
+		this.codec = checkNotNull(codec, "codec can not be null");
+	}
+
+	@Override
+	public void writeRecord(E record) throws IOException {
+		dataFileWriter.append(record);
+	}
+
+	@Override
+	public void open(int taskNumber, int numTasks) throws IOException {
+		super.open(taskNumber, numTasks);
+
+		DatumWriter<E> datumWriter;
+		Schema schema;
+		if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
+			datumWriter = new SpecificDatumWriter<E>(avroValueType);
+			try {
+				schema = ((org.apache.avro.specific.SpecificRecordBase) avroValueType.newInstance()).getSchema();
+			} catch (InstantiationException | IllegalAccessException e) {
+				throw new RuntimeException(e.getMessage());
+			}
+		} else if (org.apache.avro.generic.GenericRecord.class.isAssignableFrom(avroValueType)) {
+			if (userDefinedSchema == null) {
+				throw new IllegalStateException("Schema must be set when using Generic Record");
+			}
+			datumWriter = new GenericDatumWriter<E>(userDefinedSchema);
+			schema = userDefinedSchema;
+		} else {
+			datumWriter = new ReflectDatumWriter<E>(avroValueType);
+			schema = ReflectData.get().getSchema(avroValueType);
+		}
+		dataFileWriter = new DataFileWriter<E>(datumWriter);
+		if (codec != null) {
+			dataFileWriter.setCodec(codec.getCodecFactory());
+		}
+		if (userDefinedSchema == null) {
+			dataFileWriter.create(schema, stream);
+		} else {
+			dataFileWriter.create(userDefinedSchema, stream);
+		}
+	}
+
+	private void writeObject(java.io.ObjectOutputStream out) throws IOException {
+		out.defaultWriteObject();
+
+		if (codec != null) {
+			out.writeByte(codec.getCodecByte());
+		} else {
+			out.writeByte(-1);
+		}
+
+		if (userDefinedSchema != null) {
+			byte[] json = userDefinedSchema.toString().getBytes(ConfigConstants.DEFAULT_CHARSET);
+			out.writeInt(json.length);
+			out.write(json);
+		} else {
+			out.writeInt(0);
+		}
+	}
+
+	private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+
+		byte codecByte = in.readByte();
+		if (codecByte >= 0) {
+			setCodec(Codec.forCodecByte(codecByte));
+		}
+
+		int length = in.readInt();
+		if (length != 0) {
+			byte[] json = new byte[length];
+			in.readFully(json);
+
+			Schema schema = new Schema.Parser().parse(new String(json, ConfigConstants.DEFAULT_CHARSET));
+			setSchema(schema);
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		dataFileWriter.flush();
+		dataFileWriter.close();
+		super.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
new file mode 100644
index 0000000..4a3c02e
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
+
+/**
+ * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}.
+ *
+ * <p>Deserializes the <code>byte[]</code> messages into (nested) Flink Rows.
+ *
+ * {@link Utf8} is converted to regular Java Strings.
+ */
+public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> {
+
+	/**
+	 * Avro record class.
+	 */
+	private Class<? extends SpecificRecord> recordClazz;
+
+	/**
+	 * Schema for deterministic field order.
+	 */
+	private transient Schema schema;
+
+	/**
+	 * Reader that deserializes byte array into a record.
+	 */
+	private transient DatumReader<SpecificRecord> datumReader;
+
+	/**
+	 * Input stream to read message from.
+	 */
+	private transient MutableByteArrayInputStream inputStream;
+
+	/**
+	 * Avro decoder that decodes binary data.
+	 */
+	private transient Decoder decoder;
+
+	/**
+	 * Record to deserialize byte array to.
+	 */
+	private SpecificRecord record;
+
+	/**
+	 * Creates a Avro deserialization schema for the given record.
+	 *
+	 * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
+	 */
+	public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) {
+		Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
+		this.recordClazz = recordClazz;
+		this.schema = SpecificData.get().getSchema(recordClazz);
+		this.datumReader = new SpecificDatumReader<>(schema);
+		this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema);
+		this.inputStream = new MutableByteArrayInputStream();
+		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+	}
+
+	@Override
+	public Row deserialize(byte[] message) throws IOException {
+		// read record
+		try {
+			inputStream.setBuffer(message);
+			this.record = datumReader.read(record, decoder);
+		} catch (IOException e) {
+			throw new RuntimeException("Failed to deserialize Row.", e);
+		}
+
+		// convert to row
+		final Object row = convertToRow(schema, record);
+		return (Row) row;
+	}
+
+	private void writeObject(ObjectOutputStream oos) throws IOException {
+		oos.writeObject(recordClazz);
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
+		this.recordClazz = (Class<? extends SpecificRecord>) ois.readObject();
+		this.schema = SpecificData.get().getSchema(recordClazz);
+		this.datumReader = new SpecificDatumReader<>(schema);
+		this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema);
+		this.inputStream = new MutableByteArrayInputStream();
+		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+	}
+
+	/**
+	 * Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type.
+	 * Avro's {@link Utf8} fields are converted into regular Java strings.
+	 */
+	private static Object convertToRow(Schema schema, Object recordObj) {
+		if (recordObj instanceof GenericRecord) {
+			// records can be wrapped in a union
+			if (schema.getType() == Schema.Type.UNION) {
+				final List<Schema> types = schema.getTypes();
+				if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
+					schema = types.get(1);
+				}
+				else {
+					throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD]. Given: " + schema);
+				}
+			} else if (schema.getType() != Schema.Type.RECORD) {
+				throw new RuntimeException("Record type for row type expected. But is: " + schema);
+			}
+			final List<Schema.Field> fields = schema.getFields();
+			final Row row = new Row(fields.size());
+			final GenericRecord record = (GenericRecord) recordObj;
+			for (int i = 0; i < fields.size(); i++) {
+				final Schema.Field field = fields.get(i);
+				row.setField(i, convertToRow(field.schema(), record.get(field.pos())));
+			}
+			return row;
+		} else if (recordObj instanceof Utf8) {
+			return recordObj.toString();
+		} else {
+			return recordObj;
+		}
+	}
+
+	/**
+	 * An extension of the ByteArrayInputStream that allows to change a buffer that should be
+	 * read without creating a new ByteArrayInputStream instance. This allows to re-use the same
+	 * InputStream instance, copying message to process, and creation of Decoder on every new message.
+	 */
+	private static final class MutableByteArrayInputStream extends ByteArrayInputStream {
+
+		public MutableByteArrayInputStream() {
+			super(new byte[0]);
+		}
+
+		/**
+		 * Set buffer that can be read via the InputStream interface and reset the input stream.
+		 * This has the same effect as creating a new ByteArrayInputStream with a new buffer.
+		 *
+		 * @param buf the new buffer to read.
+		 */
+		public void setBuffer(byte[] buf) {
+			this.buf = buf;
+			this.pos = 0;
+			this.count = buf.length;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
new file mode 100644
index 0000000..41000a6
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
+
+/**
+ * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes.
+ */
+public class AvroRowSerializationSchema implements SerializationSchema<Row> {
+
+	/**
+	 * Avro record class.
+	 */
+	private Class<? extends SpecificRecord> recordClazz;
+
+	/**
+	 * Avro serialization schema.
+	 */
+	private transient Schema schema;
+
+	/**
+	 * Writer to serialize Avro record into a byte array.
+	 */
+	private transient DatumWriter<GenericRecord> datumWriter;
+
+	/**
+	 * Output stream to serialize records into byte array.
+	 */
+	private transient ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
+
+	/**
+	 * Low-level class for serialization of Avro values.
+	 */
+	private transient Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+
+	/**
+	 * Creates a Avro serialization schema for the given schema.
+	 *
+	 * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
+	 */
+	public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz) {
+		Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
+		this.recordClazz = recordClazz;
+		this.schema = SpecificData.get().getSchema(recordClazz);
+		this.datumWriter = new SpecificDatumWriter<>(schema);
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public byte[] serialize(Row row) {
+		// convert to record
+		final Object record = convertToRecord(schema, row);
+
+		// write
+		try {
+			arrayOutputStream.reset();
+			datumWriter.write((GenericRecord) record, encoder);
+			encoder.flush();
+			return arrayOutputStream.toByteArray();
+		} catch (IOException e) {
+			throw new RuntimeException("Failed to serialize Row.", e);
+		}
+	}
+
+	private void writeObject(ObjectOutputStream oos) throws IOException {
+		oos.writeObject(recordClazz);
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
+		this.recordClazz = (Class<? extends SpecificRecord>) ois.readObject();
+		this.schema = SpecificData.get().getSchema(recordClazz);
+		this.datumWriter = new SpecificDatumWriter<>(schema);
+		this.arrayOutputStream = new ByteArrayOutputStream();
+		this.encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
+	}
+
+	/**
+	 * Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
+	 * Strings are converted into Avro's {@link Utf8} fields.
+	 */
+	private static Object convertToRecord(Schema schema, Object rowObj) {
+		if (rowObj instanceof Row) {
+			// records can be wrapped in a union
+			if (schema.getType() == Schema.Type.UNION) {
+				final List<Schema> types = schema.getTypes();
+				if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
+					schema = types.get(1);
+				}
+				else if (types.size() == 2 && types.get(0).getType() == Schema.Type.RECORD && types.get(1).getType() == Schema.Type.NULL) {
+					schema = types.get(0);
+				}
+				else {
+					throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD] or UNION[RECORD, NULL] Given: " + schema);
+				}
+			} else if (schema.getType() != Schema.Type.RECORD) {
+				throw new RuntimeException("Record type for row type expected. But is: " + schema);
+			}
+			final List<Schema.Field> fields = schema.getFields();
+			final GenericRecord record = new GenericData.Record(schema);
+			final Row row = (Row) rowObj;
+			for (int i = 0; i < fields.size(); i++) {
+				final Schema.Field field = fields.get(i);
+				record.put(field.pos(), convertToRecord(field.schema(), row.getField(i)));
+			}
+			return record;
+		} else if (rowObj instanceof String) {
+			return new Utf8((String) rowObj);
+		} else {
+			return rowObj;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
new file mode 100644
index 0000000..02f74f5
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
+import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.KryoUtils;
+import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.formats.avro.utils.DataInputDecoder;
+import org.apache.flink.formats.avro.utils.DataOutputEncoder;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import com.esotericsoftware.kryo.Kryo;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.util.Utf8;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * General purpose serialization. Currently using Apache Avro's Reflect-serializers for serialization and
+ * Kryo for deep object copies. We want to change this to Kryo-only.
+ *
+ * @param <T> The type serialized.
+ */
+@Internal
+public final class AvroSerializer<T> extends TypeSerializer<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final Class<T> type;
+
+	private final Class<? extends T> typeToInstantiate;
+
+	/**
+	 * Map of class tag (using classname as tag) to their Kryo registration.
+	 *
+	 * <p>This map serves as a preview of the final registration result of
+	 * the Kryo instance, taking into account registration overwrites.
+	 */
+	private LinkedHashMap<String, KryoRegistration> kryoRegistrations;
+
+	private transient ReflectDatumWriter<T> writer;
+	private transient ReflectDatumReader<T> reader;
+
+	private transient DataOutputEncoder encoder;
+	private transient DataInputDecoder decoder;
+
+	private transient Kryo kryo;
+
+	private transient T deepCopyInstance;
+
+	// --------------------------------------------------------------------------------------------
+
+	public AvroSerializer(Class<T> type) {
+		this(type, type);
+	}
+
+	public AvroSerializer(Class<T> type, Class<? extends T> typeToInstantiate) {
+		this.type = checkNotNull(type);
+		this.typeToInstantiate = checkNotNull(typeToInstantiate);
+
+		InstantiationUtil.checkForInstantiation(typeToInstantiate);
+
+		this.kryoRegistrations = buildKryoRegistrations(type);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public AvroSerializer<T> duplicate() {
+		return new AvroSerializer<T>(type, typeToInstantiate);
+	}
+
+	@Override
+	public T createInstance() {
+		return InstantiationUtil.instantiate(this.typeToInstantiate);
+	}
+
+	@Override
+	public T copy(T from) {
+		checkKryoInitialized();
+
+		return KryoUtils.copy(from, kryo, this);
+	}
+
+	@Override
+	public T copy(T from, T reuse) {
+		checkKryoInitialized();
+
+		return KryoUtils.copy(from, reuse, kryo, this);
+	}
+
+	@Override
+	public int getLength() {
+		return -1;
+	}
+
+	@Override
+	public void serialize(T value, DataOutputView target) throws IOException {
+		checkAvroInitialized();
+		this.encoder.setOut(target);
+		this.writer.write(value, this.encoder);
+	}
+
+	@Override
+	public T deserialize(DataInputView source) throws IOException {
+		checkAvroInitialized();
+		this.decoder.setIn(source);
+		return this.reader.read(null, this.decoder);
+	}
+
+	@Override
+	public T deserialize(T reuse, DataInputView source) throws IOException {
+		checkAvroInitialized();
+		this.decoder.setIn(source);
+		return this.reader.read(reuse, this.decoder);
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		checkAvroInitialized();
+
+		if (this.deepCopyInstance == null) {
+			this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class);
+		}
+
+		this.decoder.setIn(source);
+		this.encoder.setOut(target);
+
+		T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
+		this.writer.write(tmp, this.encoder);
+	}
+
+	private void checkAvroInitialized() {
+		if (this.reader == null) {
+			this.reader = new ReflectDatumReader<T>(type);
+			this.writer = new ReflectDatumWriter<T>(type);
+			this.encoder = new DataOutputEncoder();
+			this.decoder = new DataInputDecoder();
+		}
+	}
+
+	private void checkKryoInitialized() {
+		if (this.kryo == null) {
+			this.kryo = new Kryo();
+
+			Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
+			instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
+			kryo.setInstantiatorStrategy(instantiatorStrategy);
+
+			kryo.setAsmEnabled(true);
+
+			KryoUtils.applyRegistrations(kryo, kryoRegistrations.values());
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return 31 * this.type.hashCode() + this.typeToInstantiate.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof AvroSerializer) {
+			@SuppressWarnings("unchecked")
+			AvroSerializer<T> avroSerializer = (AvroSerializer<T>) obj;
+
+			return avroSerializer.canEqual(this) &&
+				type == avroSerializer.type &&
+				typeToInstantiate == avroSerializer.typeToInstantiate;
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof AvroSerializer;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Serializer configuration snapshotting & compatibility
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public AvroSerializerConfigSnapshot<T> snapshotConfiguration() {
+		return new AvroSerializerConfigSnapshot<>(type, typeToInstantiate, kryoRegistrations);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
+			final AvroSerializerConfigSnapshot<T> config = (AvroSerializerConfigSnapshot<T>) configSnapshot;
+
+			if (type.equals(config.getTypeClass()) && typeToInstantiate.equals(config.getTypeToInstantiate())) {
+				// resolve Kryo registrations; currently, since the Kryo registrations in Avro
+				// are fixed, there shouldn't be a problem with the resolution here.
+
+				LinkedHashMap<String, KryoRegistration> oldRegistrations = config.getKryoRegistrations();
+				oldRegistrations.putAll(kryoRegistrations);
+
+				for (Map.Entry<String, KryoRegistration> reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) {
+					if (reconfiguredRegistrationEntry.getValue().isDummy()) {
+						return CompatibilityResult.requiresMigration();
+					}
+				}
+
+				this.kryoRegistrations = oldRegistrations;
+				return CompatibilityResult.compatible();
+			}
+		}
+
+		// ends up here if the preceding serializer is not
+		// the ValueSerializer, or serialized data type has changed
+		return CompatibilityResult.requiresMigration();
+	}
+
+	/**
+	 * {@link TypeSerializerConfigSnapshot} for Avro.
+	 */
+	public static class AvroSerializerConfigSnapshot<T> extends KryoRegistrationSerializerConfigSnapshot<T> {
+
+		private static final int VERSION = 1;
+
+		private Class<? extends T> typeToInstantiate;
+
+		public AvroSerializerConfigSnapshot() {}
+
+		public AvroSerializerConfigSnapshot(
+			Class<T> baseType,
+			Class<? extends T> typeToInstantiate,
+			LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
+
+			super(baseType, kryoRegistrations);
+			this.typeToInstantiate = Preconditions.checkNotNull(typeToInstantiate);
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			super.write(out);
+
+			out.writeUTF(typeToInstantiate.getName());
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public void read(DataInputView in) throws IOException {
+			super.read(in);
+
+			String classname = in.readUTF();
+			try {
+				typeToInstantiate = (Class<? extends T>) Class.forName(classname, true, getUserCodeClassLoader());
+			} catch (ClassNotFoundException e) {
+				throw new IOException("Cannot find requested class " + classname + " in classpath.", e);
+			}
+		}
+
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+
+		public Class<? extends T> getTypeToInstantiate() {
+			return typeToInstantiate;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+
+		// kryoRegistrations may be null if this Avro serializer is deserialized from an old version
+		if (kryoRegistrations == null) {
+			this.kryoRegistrations = buildKryoRegistrations(type);
+		}
+	}
+
+	private static <T> LinkedHashMap<String, KryoRegistration> buildKryoRegistrations(Class<T> serializedDataType) {
+		final LinkedHashMap<String, KryoRegistration> registrations = new LinkedHashMap<>();
+
+		// register Avro types.
+		registrations.put(
+				GenericData.Array.class.getName(),
+				new KryoRegistration(
+						GenericData.Array.class,
+						new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList())));
+		registrations.put(Utf8.class.getName(), new KryoRegistration(Utf8.class));
+		registrations.put(GenericData.EnumSymbol.class.getName(), new KryoRegistration(GenericData.EnumSymbol.class));
+		registrations.put(GenericData.Fixed.class.getName(), new KryoRegistration(GenericData.Fixed.class));
+		registrations.put(GenericData.StringType.class.getName(), new KryoRegistration(GenericData.StringType.class));
+
+		// register the serialized data type
+		registrations.put(serializedDataType.getName(), new KryoRegistration(serializedDataType));
+
+		return registrations;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
new file mode 100644
index 0000000..ddc89a8
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import org.apache.avro.specific.SpecificRecordBase;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Special type information to generate a special AvroTypeInfo for Avro POJOs (implementing SpecificRecordBase, the typed Avro POJOs)
+ *
+ * <p>Proceeding: It uses a regular pojo type analysis and replaces all {@code GenericType<CharSequence>} with a {@code GenericType<avro.Utf8>}.
+ * All other types used by Avro are standard Java types.
+ * Only strings are represented as CharSequence fields and represented as Utf8 classes at runtime.
+ * CharSequence is not comparable. To make them nicely usable with field expressions, we replace them here
+ * by generic type infos containing Utf8 classes (which are comparable),
+ *
+ * <p>This class is checked by the AvroPojoTest.
+ */
+public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> {
+
+	public AvroTypeInfo(Class<T> typeClass) {
+		super(typeClass, generateFieldsFromAvroSchema(typeClass));
+	}
+
+	@Override
+	public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+		return super.createSerializer(config);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Internal
+	public static <T extends SpecificRecordBase> List<PojoField> generateFieldsFromAvroSchema(Class<T> typeClass) {
+		PojoTypeExtractor pte = new PojoTypeExtractor();
+		ArrayList<Type> typeHierarchy = new ArrayList<>();
+		typeHierarchy.add(typeClass);
+		TypeInformation ti = pte.analyzePojo(typeClass, typeHierarchy, null, null, null);
+
+		if (!(ti instanceof PojoTypeInfo)) {
+			throw new IllegalStateException("Expecting type to be a PojoTypeInfo");
+		}
+		PojoTypeInfo pti =  (PojoTypeInfo) ti;
+		List<PojoField> newFields = new ArrayList<>(pti.getTotalFields());
+
+		for (int i = 0; i < pti.getArity(); i++) {
+			PojoField f = pti.getPojoFieldAt(i);
+			TypeInformation newType = f.getTypeInformation();
+			// check if type is a CharSequence
+			if (newType instanceof GenericTypeInfo) {
+				if ((newType).getTypeClass().equals(CharSequence.class)) {
+					// replace the type by a org.apache.avro.util.Utf8
+					newType = new GenericTypeInfo(org.apache.avro.util.Utf8.class);
+				}
+			}
+			PojoField newField = new PojoField(f.getField(), newType);
+			newFields.add(newField);
+		}
+		return newFields;
+	}
+
+	private static class PojoTypeExtractor extends TypeExtractor {
+		private PojoTypeExtractor() {
+			super();
+		}
+
+		@Override
+		public <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(Class<OUT> clazz, ArrayList<Type> typeHierarchy,
+				ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
+			return super.analyzePojo(clazz, typeHierarchy, parameterizedType, in1Type, in2Type);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
new file mode 100644
index 0000000..7305f23
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.utils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+
+import java.io.Serializable;
+
+/**
+ * Utilities for integrating Avro serializers in Kryo.
+ */
+public class AvroKryoSerializerUtils {
+
+	public static void addAvroSerializers(ExecutionConfig reg, Class<?> type) {
+		// Avro POJOs contain java.util.List which have GenericData.Array as their runtime type
+		// because Kryo is not able to serialize them properly, we use this serializer for them
+		reg.registerTypeWithKryoSerializer(GenericData.Array.class, Serializers.SpecificInstanceCollectionSerializerForArrayList.class);
+
+		// We register this serializer for users who want to use untyped Avro records (GenericData.Record).
+		// Kryo is able to serialize everything in there, except for the Schema.
+		// This serializer is very slow, but using the GenericData.Records of Kryo is in general a bad idea.
+		// we add the serializer as a default serializer because Avro is using a private sub-type at runtime.
+		reg.addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class);
+	}
+
+	/**
+	 * Slow serialization approach for Avro schemas.
+	 * This is only used with {{@link org.apache.avro.generic.GenericData.Record}} types.
+	 * Having this serializer, we are able to handle avro Records.
+	 */
+	public static class AvroSchemaSerializer extends Serializer<Schema> implements Serializable {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void write(Kryo kryo, Output output, Schema object) {
+			String schemaAsString = object.toString(false);
+			output.writeString(schemaAsString);
+		}
+
+		@Override
+		public Schema read(Kryo kryo, Input input, Class<Schema> type) {
+			String schemaAsString = input.readString();
+			// the parser seems to be stateful, to we need a new one for every type.
+			Schema.Parser sParser = new Schema.Parser();
+			return sParser.parse(schemaAsString);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataInputDecoder.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataInputDecoder.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataInputDecoder.java
new file mode 100644
index 0000000..32032cc
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataInputDecoder.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.utils;
+
+import org.apache.avro.io.Decoder;
+import org.apache.avro.util.Utf8;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * A {@link Decoder} that reads from a {@link DataInput}.
+ */
+public class DataInputDecoder extends Decoder {
+
+	private final Utf8 stringDecoder = new Utf8();
+
+	private DataInput in;
+
+	public void setIn(DataInput in) {
+		this.in = in;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// primitives
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void readNull() {}
+
+	@Override
+	public boolean readBoolean() throws IOException {
+		return in.readBoolean();
+	}
+
+	@Override
+	public int readInt() throws IOException {
+		return in.readInt();
+	}
+
+	@Override
+	public long readLong() throws IOException {
+		return in.readLong();
+	}
+
+	@Override
+	public float readFloat() throws IOException {
+		return in.readFloat();
+	}
+
+	@Override
+	public double readDouble() throws IOException {
+		return in.readDouble();
+	}
+
+	@Override
+	public int readEnum() throws IOException {
+		return readInt();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// bytes
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void readFixed(byte[] bytes, int start, int length) throws IOException {
+		in.readFully(bytes, start, length);
+	}
+
+	@Override
+	public ByteBuffer readBytes(ByteBuffer old) throws IOException {
+		int length = readInt();
+		ByteBuffer result;
+		if (old != null && length <= old.capacity() && old.hasArray()) {
+			result = old;
+			result.clear();
+		} else {
+			result = ByteBuffer.allocate(length);
+		}
+		in.readFully(result.array(), result.arrayOffset() + result.position(), length);
+		result.limit(length);
+		return result;
+	}
+
+	@Override
+	public void skipFixed(int length) throws IOException {
+		skipBytes(length);
+	}
+
+	@Override
+	public void skipBytes() throws IOException {
+		int num = readInt();
+		skipBytes(num);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// strings
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public Utf8 readString(Utf8 old) throws IOException {
+		int length = readInt();
+		Utf8 result = (old != null ? old : new Utf8());
+		result.setByteLength(length);
+
+		if (length > 0) {
+			in.readFully(result.getBytes(), 0, length);
+		}
+
+		return result;
+	}
+
+	@Override
+	public String readString() throws IOException {
+		return readString(stringDecoder).toString();
+	}
+
+	@Override
+	public void skipString() throws IOException {
+		int len = readInt();
+		skipBytes(len);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// collection types
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public long readArrayStart() throws IOException {
+		return readVarLongCount(in);
+	}
+
+	@Override
+	public long arrayNext() throws IOException {
+		return readVarLongCount(in);
+	}
+
+	@Override
+	public long skipArray() throws IOException {
+		return readVarLongCount(in);
+	}
+
+	@Override
+	public long readMapStart() throws IOException {
+		return readVarLongCount(in);
+	}
+
+	@Override
+	public long mapNext() throws IOException {
+		return readVarLongCount(in);
+	}
+
+	@Override
+	public long skipMap() throws IOException {
+		return readVarLongCount(in);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// union
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int readIndex() throws IOException {
+		return readInt();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// utils
+	// --------------------------------------------------------------------------------------------
+
+	private void skipBytes(int num) throws IOException {
+		while (num > 0) {
+			num -= in.skipBytes(num);
+		}
+	}
+
+	public static long readVarLongCount(DataInput in) throws IOException {
+		long value = in.readUnsignedByte();
+
+		if ((value & 0x80) == 0) {
+			return value;
+		}
+		else {
+			long curr;
+			int shift = 7;
+			value = value & 0x7f;
+			while (((curr = in.readUnsignedByte()) & 0x80) != 0){
+				value |= (curr & 0x7f) << shift;
+				shift += 7;
+			}
+			value |= curr << shift;
+			return value;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataOutputEncoder.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataOutputEncoder.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataOutputEncoder.java
new file mode 100644
index 0000000..c2d490b
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/DataOutputEncoder.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.utils;
+
+import org.apache.avro.io.Encoder;
+import org.apache.avro.util.Utf8;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * An {@link Encoder} that writes data to a {@link DataOutput}.
+ */
+public final class DataOutputEncoder extends Encoder implements java.io.Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private DataOutput out;
+
+	public void setOut(DataOutput out) {
+		this.out = out;
+	}
+
+	@Override
+	public void flush() throws IOException {}
+
+	// --------------------------------------------------------------------------------------------
+	// primitives
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void writeNull() {}
+
+	@Override
+	public void writeBoolean(boolean b) throws IOException {
+		out.writeBoolean(b);
+	}
+
+	@Override
+	public void writeInt(int n) throws IOException {
+		out.writeInt(n);
+	}
+
+	@Override
+	public void writeLong(long n) throws IOException {
+		out.writeLong(n);
+	}
+
+	@Override
+	public void writeFloat(float f) throws IOException {
+		out.writeFloat(f);
+	}
+
+	@Override
+	public void writeDouble(double d) throws IOException {
+		out.writeDouble(d);
+	}
+
+	@Override
+	public void writeEnum(int e) throws IOException {
+		out.writeInt(e);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// bytes
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void writeFixed(byte[] bytes, int start, int len) throws IOException {
+		out.write(bytes, start, len);
+	}
+
+	@Override
+	public void writeBytes(byte[] bytes, int start, int len) throws IOException {
+		out.writeInt(len);
+		if (len > 0) {
+			out.write(bytes, start, len);
+		}
+	}
+
+	@Override
+	public void writeBytes(ByteBuffer bytes) throws IOException {
+		int num = bytes.remaining();
+		out.writeInt(num);
+
+		if (num > 0) {
+			writeFixed(bytes);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// strings
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void writeString(String str) throws IOException {
+		byte[] bytes = Utf8.getBytesFor(str);
+		writeBytes(bytes, 0, bytes.length);
+	}
+
+	@Override
+	public void writeString(Utf8 utf8) throws IOException {
+		writeBytes(utf8.getBytes(), 0, utf8.getByteLength());
+
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// collection types
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void writeArrayStart() {}
+
+	@Override
+	public void setItemCount(long itemCount) throws IOException {
+		if (itemCount > 0) {
+			writeVarLongCount(out, itemCount);
+		}
+	}
+
+	@Override
+	public void startItem() {}
+
+	@Override
+	public void writeArrayEnd() throws IOException {
+		// write a single byte 0, shortcut for a var-length long of 0
+		out.write(0);
+	}
+
+	@Override
+	public void writeMapStart() {}
+
+	@Override
+	public void writeMapEnd() throws IOException {
+		// write a single byte 0, shortcut for a var-length long of 0
+		out.write(0);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// union
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void writeIndex(int unionIndex) throws IOException {
+		out.writeInt(unionIndex);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// utils
+	// --------------------------------------------------------------------------------------------
+
+	public static void writeVarLongCount(DataOutput out, long val) throws IOException {
+		if (val < 0) {
+			throw new IOException("Illegal count (must be non-negative): " + val);
+		}
+
+		while ((val & ~0x7FL) != 0) {
+			out.write(((int) val) | 0x80);
+			val >>>= 7;
+		}
+		out.write((int) val);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/FSDataInputStreamWrapper.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/FSDataInputStreamWrapper.java
new file mode 100644
index 0000000..c00fecb
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/FSDataInputStreamWrapper.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.utils;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import org.apache.avro.file.SeekableInput;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache licensed as well).
+ *
+ * <p>The wrapper keeps track of the position in the data stream.
+ */
+public class FSDataInputStreamWrapper implements Closeable, SeekableInput {
+	private final FSDataInputStream stream;
+	private long pos;
+	private long len;
+
+	public FSDataInputStreamWrapper(FSDataInputStream stream, long len) {
+		this.stream = stream;
+		this.pos = 0;
+		this.len = len;
+	}
+
+	public long length() throws IOException {
+		return this.len;
+	}
+
+	public int read(byte[] b, int off, int len) throws IOException {
+		int read;
+		read = stream.read(b, off, len);
+		pos += read;
+		return read;
+	}
+
+	public void seek(long p) throws IOException {
+		stream.seek(p);
+		pos = p;
+	}
+
+	public long tell() throws IOException {
+		return pos;
+	}
+
+	public void close() throws IOException {
+		stream.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/assembly/test-assembly.xml b/flink-formats/flink-avro/src/test/assembly/test-assembly.xml
new file mode 100644
index 0000000..8361693
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/assembly/test-assembly.xml
@@ -0,0 +1,36 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<assembly>
+	<id>test-jar</id>
+	<formats>
+		<format>jar</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>${project.build.testOutputDirectory}</directory>
+			<outputDirectory></outputDirectory>
+			<!--modify/add include to match your package(s) -->
+			<includes>
+				<include>org/apache/flink/formats/avro/testjar/**</include>
+			</includes>
+		</fileSet>
+	</fileSets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
new file mode 100644
index 0000000..985471a
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroExternalJarProgramITCase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.avro.testjar.AvroExternalJarProgram;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Collections;
+
+/**
+ * IT case for the {@link AvroExternalJarProgram}.
+ */
+public class AvroExternalJarProgramITCase extends TestLogger {
+
+	private static final String JAR_FILE = "maven-test-jar.jar";
+
+	private static final String TEST_DATA_FILE = "/testdata.avro";
+
+	@Test
+	public void testExternalProgram() {
+
+		LocalFlinkMiniCluster testMiniCluster = null;
+
+		try {
+			int parallelism = 4;
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+			testMiniCluster = new LocalFlinkMiniCluster(config, false);
+			testMiniCluster.start();
+
+			String jarFile = JAR_FILE;
+			String testData = getClass().getResource(TEST_DATA_FILE).toString();
+
+			PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });
+
+			TestEnvironment.setAsContext(
+				testMiniCluster,
+				parallelism,
+				Collections.singleton(new Path(jarFile)),
+				Collections.<URL>emptyList());
+
+			config.setString(JobManagerOptions.ADDRESS, "localhost");
+			config.setInteger(JobManagerOptions.PORT, testMiniCluster.getLeaderRPCPort());
+
+			program.invokeInteractiveModeForExecution();
+		}
+		catch (Throwable t) {
+			System.err.println(t.getMessage());
+			t.printStackTrace();
+			Assert.fail("Error during the packaged program execution: " + t.getMessage());
+		}
+		finally {
+			TestEnvironment.unsetAsContext();
+
+			if (testMiniCluster != null) {
+				try {
+					testMiniCluster.stop();
+				} catch (Throwable t) {
+					// ignore
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroInputFormatTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroInputFormatTypeExtractionTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroInputFormatTypeExtractionTest.java
new file mode 100644
index 0000000..bc4f253
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroInputFormatTypeExtractionTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for the type extraction of the {@link AvroInputFormat}.
+ */
+public class AvroInputFormatTypeExtractionTest {
+
+	@Test
+	public void testTypeExtraction() {
+		try {
+			InputFormat<MyAvroType, ?> format = new AvroInputFormat<MyAvroType>(new Path("file:///ignore/this/file"), MyAvroType.class);
+
+			TypeInformation<?> typeInfoDirect = TypeExtractor.getInputFormatTypes(format);
+
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			DataSet<MyAvroType> input = env.createInput(format);
+			TypeInformation<?> typeInfoDataSet = input.getType();
+
+			Assert.assertTrue(typeInfoDirect instanceof PojoTypeInfo);
+			Assert.assertTrue(typeInfoDataSet instanceof PojoTypeInfo);
+
+			Assert.assertEquals(MyAvroType.class, typeInfoDirect.getTypeClass());
+			Assert.assertEquals(MyAvroType.class, typeInfoDataSet.getTypeClass());
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+
+	/**
+	 * Test type.
+	 */
+	public static final class MyAvroType {
+
+		public String theString;
+
+		public MyAvroType recursive;
+
+		private double aDouble;
+
+		public double getaDouble() {
+			return aDouble;
+		}
+
+		public void setaDouble(double aDouble) {
+			this.aDouble = aDouble;
+		}
+
+		public void setTheString(String theString) {
+			this.theString = theString;
+		}
+
+		public String getTheString() {
+			return theString;
+		}
+	}
+}