You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/02 13:35:35 UTC

[45/51] [abbrv] [partial] flink git commit: [FLINK-4676] [connectors] Merge batch and streaming connectors into common Maven module.

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
new file mode 100644
index 0000000..73067c1
--- /dev/null
+++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.io;
+
+import java.io.IOException;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.api.avro.FSDataInputStreamWrapper;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.InstantiationUtil;
+
+/**
+ * Provides a {@link FileInputFormat} for Avro records.
+ *
+ * @param <E>
+ *            the type of the result Avro record. If you specify
+ *            {@link GenericRecord} then the result will be returned as a
+ *            {@link GenericRecord}, so you do not have to know the schema ahead
+ *            of time.
+ */
+public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E>,
+	CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(AvroInputFormat.class);
+
+	private final Class<E> avroValueType;
+	
+	private boolean reuseAvroValue = true;
+
+	private transient DataFileReader<E> dataFileReader;
+
+	private transient long end;
+
+	private transient long recordsReadSinceLastSync;
+
+	private long lastSync = -1l;
+
+	public AvroInputFormat(Path filePath, Class<E> type) {
+		super(filePath);
+		this.avroValueType = type;
+	}
+
+	/**
+	 * Sets the flag whether to reuse the Avro value instance for all records.
+	 * By default, the input format reuses the Avro value.
+	 *
+	 * @param reuseAvroValue True, if the input format should reuse the Avro value instance, false otherwise.
+	 */
+	public void setReuseAvroValue(boolean reuseAvroValue) {
+		this.reuseAvroValue = reuseAvroValue;
+	}
+
+	/**
+	 * If set, the InputFormat will only read entire files.
+	 */
+	public void setUnsplittable(boolean unsplittable) {
+		this.unsplittable = unsplittable;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	// Typing
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public TypeInformation<E> getProducedType() {
+		return TypeExtractor.getForClass(this.avroValueType);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	// Input Format Methods
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void open(FileInputSplit split) throws IOException {
+		super.open(split);
+		dataFileReader = initReader(split);
+		dataFileReader.sync(split.getStart());
+		lastSync = dataFileReader.previousSync();
+	}
+
+	private DataFileReader<E> initReader(FileInputSplit split) throws IOException {
+		DatumReader<E> datumReader;
+
+		if (org.apache.avro.generic.GenericRecord.class == avroValueType) {
+			datumReader = new GenericDatumReader<E>();
+		} else {
+			datumReader = org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)
+				? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType);
+		}
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Opening split {}", split);
+		}
+
+		SeekableInput in = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
+		DataFileReader<E> dataFileReader = (DataFileReader) DataFileReader.openReader(in, datumReader);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Loaded SCHEMA: {}", dataFileReader.getSchema());
+		}
+
+		end = split.getStart() + split.getLength();
+		recordsReadSinceLastSync = 0;
+		return dataFileReader;
+	}
+
+	@Override
+	public boolean reachedEnd() throws IOException {
+		return !dataFileReader.hasNext() || dataFileReader.pastSync(end);
+	}
+
+	public long getRecordsReadFromBlock() {
+		return this.recordsReadSinceLastSync;
+	}
+
+	@Override
+	public E nextRecord(E reuseValue) throws IOException {
+		if (reachedEnd()) {
+			return null;
+		}
+
+		// if we start a new block, then register the event, and
+		// restart the counter.
+		if(dataFileReader.previousSync() != lastSync) {
+			lastSync = dataFileReader.previousSync();
+			recordsReadSinceLastSync = 0;
+		}
+		recordsReadSinceLastSync++;
+
+		if (reuseAvroValue) {
+			return dataFileReader.next(reuseValue);
+		} else {
+			if (GenericRecord.class == avroValueType) {
+				return dataFileReader.next();
+			} else {
+				return dataFileReader.next(InstantiationUtil.instantiate(avroValueType, Object.class));
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Checkpointing
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public Tuple2<Long, Long> getCurrentState() throws IOException {
+		return new Tuple2<>(this.lastSync, this.recordsReadSinceLastSync);
+	}
+
+	@Override
+	public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException {
+		Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
+		Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");
+
+		try {
+			this.open(split);
+		} finally {
+			if (state.f0 != -1) {
+				lastSync = state.f0;
+				recordsReadSinceLastSync = state.f1;
+			}
+		}
+
+		if (lastSync != -1) {
+			// open and read until the record we were before
+			// the checkpoint and discard the values
+			dataFileReader.seek(lastSync);
+			for(int i = 0; i < recordsReadSinceLastSync; i++) {
+				dataFileReader.next(null);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
new file mode 100644
index 0000000..600d1e5
--- /dev/null
+++ b/flink-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.core.fs.Path;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable {
+
+	/**
+	 * Wrapper which encapsulates the supported codec and a related serialization byte.
+	 */
+	public enum Codec {
+
+		NULL((byte)0, CodecFactory.nullCodec()),
+		SNAPPY((byte)1, CodecFactory.snappyCodec()),
+		BZIP2((byte)2, CodecFactory.bzip2Codec()),
+		DEFLATE((byte)3, CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL)),
+		XZ((byte)4, CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL));
+
+		private byte codecByte;
+
+		private CodecFactory codecFactory;
+
+		Codec(final byte codecByte, final CodecFactory codecFactory) {
+			this.codecByte = codecByte;
+			this.codecFactory = codecFactory;
+		}
+
+		private byte getCodecByte() {
+			return codecByte;
+		}
+
+		private CodecFactory getCodecFactory() {
+			return codecFactory;
+		}
+
+		private static Codec forCodecByte(byte codecByte) {
+			for (final Codec codec : Codec.values()) {
+				if (codec.getCodecByte() == codecByte) {
+					return codec;
+				}
+			}
+			throw new IllegalArgumentException("no codec for codecByte: " + codecByte);
+		}
+	}
+
+	private static final long serialVersionUID = 1L;
+
+	private final Class<E> avroValueType;
+
+	private transient Schema userDefinedSchema = null;
+
+	private transient Codec codec = null;
+	
+	private transient DataFileWriter<E> dataFileWriter;
+
+	public AvroOutputFormat(Path filePath, Class<E> type) {
+		super(filePath);
+		this.avroValueType = type;
+	}
+
+	public AvroOutputFormat(Class<E> type) {
+		this.avroValueType = type;
+	}
+
+	@Override
+	protected String getDirectoryFileName(int taskNumber) {
+		return super.getDirectoryFileName(taskNumber) + ".avro";
+	}
+
+	public void setSchema(Schema schema) {
+		this.userDefinedSchema = schema;
+	}
+
+	/**
+	 * Set avro codec for compression.
+	 *
+	 * @param codec avro codec.
+	 */
+	public void setCodec(final Codec codec) {
+		this.codec = checkNotNull(codec, "codec can not be null");
+	}
+
+	@Override
+	public void writeRecord(E record) throws IOException {
+		dataFileWriter.append(record);
+	}
+
+	@Override
+	public void open(int taskNumber, int numTasks) throws IOException {
+		super.open(taskNumber, numTasks);
+
+		DatumWriter<E> datumWriter;
+		Schema schema;
+		if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
+			datumWriter = new SpecificDatumWriter<E>(avroValueType);
+			try {
+				schema = ((org.apache.avro.specific.SpecificRecordBase)avroValueType.newInstance()).getSchema();
+			} catch (InstantiationException e) {
+				throw new RuntimeException(e.getMessage());
+			} catch (IllegalAccessException e) {
+				throw new RuntimeException(e.getMessage());
+			}
+		} else {
+			datumWriter = new ReflectDatumWriter<E>(avroValueType);
+			schema = ReflectData.get().getSchema(avroValueType);
+		}
+		dataFileWriter = new DataFileWriter<E>(datumWriter);
+		if (codec != null) {
+			dataFileWriter.setCodec(codec.getCodecFactory());
+		}
+		if (userDefinedSchema == null) {
+			dataFileWriter.create(schema, stream);
+		} else {
+			dataFileWriter.create(userDefinedSchema, stream);
+		}
+	}
+
+	private void writeObject(java.io.ObjectOutputStream out) throws IOException {
+		out.defaultWriteObject();
+
+		if (codec != null) {
+			out.writeByte(codec.getCodecByte());
+		} else {
+			out.writeByte(-1);
+		}
+
+		if(userDefinedSchema != null) {
+			byte[] json = userDefinedSchema.toString().getBytes();
+			out.writeInt(json.length);
+			out.write(json);
+		} else {
+			out.writeInt(0);
+		}
+	}
+
+	private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+
+		byte codecByte = in.readByte();
+		if (codecByte >= 0) {
+			setCodec(Codec.forCodecByte(codecByte));
+		}
+
+		int length = in.readInt();
+		if(length != 0) {
+			byte[] json = new byte[length];
+			in.readFully(json);
+
+			Schema schema = new Schema.Parser().parse(new String(json));
+			setSchema(schema);
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		dataFileWriter.flush();
+		dataFileWriter.close();
+		super.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml b/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml
new file mode 100644
index 0000000..0f4561a
--- /dev/null
+++ b/flink-connectors/flink-avro/src/test/assembly/test-assembly.xml
@@ -0,0 +1,36 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<assembly>
+	<id>test-jar</id>
+	<formats>
+		<format>jar</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>${project.build.testOutputDirectory}</directory>
+			<outputDirectory>/</outputDirectory>
+			<!--modify/add include to match your package(s) -->
+			<includes>
+				<include>org/apache/flink/api/avro/testjar/**</include>
+			</includes>
+		</fileSet>
+	</fileSets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
new file mode 100644
index 0000000..1030ff8
--- /dev/null
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import java.io.File;
+
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AvroExternalJarProgramITCase {
+
+	private static final String JAR_FILE = "maven-test-jar.jar";
+
+	private static final String TEST_DATA_FILE = "/testdata.avro";
+
+	@Test
+	public void testExternalProgram() {
+
+		LocalFlinkMiniCluster testMiniCluster = null;
+
+		try {
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
+			testMiniCluster = new LocalFlinkMiniCluster(config, false);
+			testMiniCluster.start();
+
+			String jarFile = JAR_FILE;
+			String testData = getClass().getResource(TEST_DATA_FILE).toString();
+
+			PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });
+
+
+			config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, testMiniCluster.getLeaderRPCPort());
+
+			ClusterClient client = new StandaloneClusterClient(config);
+
+			client.setPrintStatusDuringExecution(false);
+			client.run(program, 4);
+
+		}
+		catch (Throwable t) {
+			System.err.println(t.getMessage());
+			t.printStackTrace();
+			Assert.fail("Error during the packaged program execution: " + t.getMessage());
+		}
+		finally {
+			if (testMiniCluster != null) {
+				try {
+					testMiniCluster.stop();
+				} catch (Throwable t) {
+					// ignore
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
new file mode 100644
index 0000000..3b01ccb
--- /dev/null
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import org.junit.Assert;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.flink.api.io.avro.example.User;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.io.AvroOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+@SuppressWarnings("serial")
+public class AvroOutputFormatITCase extends JavaProgramTestBase {
+
+	public static String outputPath1;
+
+	public static String outputPath2;
+
+	public static String inputPath;
+
+	public static String userData = "alice|1|blue\n" +
+		"bob|2|red\n" +
+		"john|3|yellow\n" +
+		"walt|4|black\n";
+
+	@Override
+	protected void preSubmit() throws Exception {
+		inputPath = createTempFile("user", userData);
+		outputPath1 = getTempDirPath("avro_output1");
+		outputPath2 = getTempDirPath("avro_output2");
+	}
+
+
+	@Override
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<String, Integer, String>> input = env.readCsvFile(inputPath)
+			.fieldDelimiter("|")
+			.types(String.class, Integer.class, String.class);
+
+		//output the data with AvroOutputFormat for specific user type
+		DataSet<User> specificUser = input.map(new ConvertToUser());
+		AvroOutputFormat<User> avroOutputFormat = new AvroOutputFormat<User>(User.class);
+		avroOutputFormat.setCodec(AvroOutputFormat.Codec.SNAPPY); // FLINK-4771: use a codec
+		avroOutputFormat.setSchema(User.SCHEMA$); //FLINK-3304: Ensure the OF is properly serializing the schema
+		specificUser.write(avroOutputFormat, outputPath1);
+
+		//output the data with AvroOutputFormat for reflect user type
+		DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new ConvertToReflective());
+		reflectiveUser.write(new AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2);
+
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		//compare result for specific user type
+		File [] output1;
+		File file1 = asFile(outputPath1);
+		if (file1.isDirectory()) {
+			output1 = file1.listFiles();
+			// check for avro ext in dir.
+			for (File avroOutput : output1) {
+				Assert.assertTrue("Expect extension '.avro'", avroOutput.toString().endsWith(".avro"));
+			}
+		} else {
+			output1 = new File[] {file1};
+		}
+		List<String> result1 = new ArrayList<String>();
+		DatumReader<User> userDatumReader1 = new SpecificDatumReader<User>(User.class);
+		for (File avroOutput : output1) {
+
+			DataFileReader<User> dataFileReader1 = new DataFileReader<User>(avroOutput, userDatumReader1);
+			while (dataFileReader1.hasNext()) {
+				User user = dataFileReader1.next();
+				result1.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
+			}
+		}
+		for (String expectedResult : userData.split("\n")) {
+			Assert.assertTrue("expected user " + expectedResult + " not found.", result1.contains(expectedResult));
+		}
+
+		//compare result for reflect user type
+		File [] output2;
+		File file2 = asFile(outputPath2);
+		if (file2.isDirectory()) {
+			output2 = file2.listFiles();
+		} else {
+			output2 = new File[] {file2};
+		}
+		List<String> result2 = new ArrayList<String>();
+		DatumReader<ReflectiveUser> userDatumReader2 = new ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class);
+		for (File avroOutput : output2) {
+			DataFileReader<ReflectiveUser> dataFileReader2 = new DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2);
+			while (dataFileReader2.hasNext()) {
+				ReflectiveUser user = dataFileReader2.next();
+				result2.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
+			}
+		}
+		for (String expectedResult : userData.split("\n")) {
+			Assert.assertTrue("expected user " + expectedResult + " not found.", result2.contains(expectedResult));
+		}
+
+
+	}
+
+
+	public final static class ConvertToUser extends RichMapFunction<Tuple3<String, Integer, String>, User> {
+
+		@Override
+		public User map(Tuple3<String, Integer, String> value) throws Exception {
+			return new User(value.f0, value.f1, value.f2);
+		}
+	}
+
+	public final static class ConvertToReflective extends RichMapFunction<User, ReflectiveUser> {
+
+		@Override
+		public ReflectiveUser map(User value) throws Exception {
+			return new ReflectiveUser(value.getName().toString(), value.getFavoriteNumber(), value.getFavoriteColor().toString());
+		}
+	}
+
+	
+	public static class ReflectiveUser {
+		private String name;
+		private int favoriteNumber;
+		private String favoriteColor;
+
+		public ReflectiveUser() {}
+
+		public ReflectiveUser(String name, int favoriteNumber, String favoriteColor) {
+			this.name = name;
+			this.favoriteNumber = favoriteNumber;
+			this.favoriteColor = favoriteColor;
+		}
+		
+		public String getName() {
+			return this.name;
+		}
+		public String getFavoriteColor() {
+			return this.favoriteColor;
+		}
+		public int getFavoriteNumber() {
+			return this.favoriteNumber;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
new file mode 100644
index 0000000..c39db15
--- /dev/null
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
@@ -0,0 +1,528 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.flink.api.io.avro.generated.Address;
+import org.apache.flink.api.io.avro.generated.Colors;
+import org.apache.flink.api.io.avro.generated.Fixed16;
+import org.apache.flink.api.io.avro.generated.User;
+import org.apache.flink.util.StringUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests the {@link DataOutputEncoder} and {@link DataInputDecoder} classes for Avro serialization.
+ */
+public class EncoderDecoderTest {
+	@Test
+	public void testComplexStringsDirecty() {
+		try {
+			Random rnd = new Random(349712539451944123L);
+			
+			for (int i = 0; i < 10; i++) {
+				String testString = StringUtils.getRandomString(rnd, 10, 100);
+				
+				ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
+				{
+					DataOutputStream dataOut = new DataOutputStream(baos);
+					DataOutputEncoder encoder = new DataOutputEncoder();
+					encoder.setOut(dataOut);
+					
+					encoder.writeString(testString);
+					dataOut.flush();
+					dataOut.close();
+				}
+				
+				byte[] data = baos.toByteArray();
+				
+				// deserialize
+				{
+					ByteArrayInputStream bais = new ByteArrayInputStream(data);
+					DataInputStream dataIn = new DataInputStream(bais);
+					DataInputDecoder decoder = new DataInputDecoder();
+					decoder.setIn(dataIn);
+	
+					String deserialized = decoder.readString();
+					
+					assertEquals(testString, deserialized);
+				}
+			}
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Test failed due to an exception: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testPrimitiveTypes() {
+		
+		testObjectSerialization(new Boolean(true));
+		testObjectSerialization(new Boolean(false));
+		
+		testObjectSerialization(Byte.valueOf((byte) 0));
+		testObjectSerialization(Byte.valueOf((byte) 1));
+		testObjectSerialization(Byte.valueOf((byte) -1));
+		testObjectSerialization(Byte.valueOf(Byte.MIN_VALUE));
+		testObjectSerialization(Byte.valueOf(Byte.MAX_VALUE));
+		
+		testObjectSerialization(Short.valueOf((short) 0));
+		testObjectSerialization(Short.valueOf((short) 1));
+		testObjectSerialization(Short.valueOf((short) -1));
+		testObjectSerialization(Short.valueOf(Short.MIN_VALUE));
+		testObjectSerialization(Short.valueOf(Short.MAX_VALUE));
+		
+		testObjectSerialization(Integer.valueOf(0));
+		testObjectSerialization(Integer.valueOf(1));
+		testObjectSerialization(Integer.valueOf(-1));
+		testObjectSerialization(Integer.valueOf(Integer.MIN_VALUE));
+		testObjectSerialization(Integer.valueOf(Integer.MAX_VALUE));
+		
+		testObjectSerialization(Long.valueOf(0));
+		testObjectSerialization(Long.valueOf(1));
+		testObjectSerialization(Long.valueOf(-1));
+		testObjectSerialization(Long.valueOf(Long.MIN_VALUE));
+		testObjectSerialization(Long.valueOf(Long.MAX_VALUE));
+		
+		testObjectSerialization(Float.valueOf(0));
+		testObjectSerialization(Float.valueOf(1));
+		testObjectSerialization(Float.valueOf(-1));
+		testObjectSerialization(Float.valueOf((float)Math.E));
+		testObjectSerialization(Float.valueOf((float)Math.PI));
+		testObjectSerialization(Float.valueOf(Float.MIN_VALUE));
+		testObjectSerialization(Float.valueOf(Float.MAX_VALUE));
+		testObjectSerialization(Float.valueOf(Float.MIN_NORMAL));
+		testObjectSerialization(Float.valueOf(Float.NaN));
+		testObjectSerialization(Float.valueOf(Float.NEGATIVE_INFINITY));
+		testObjectSerialization(Float.valueOf(Float.POSITIVE_INFINITY));
+		
+		testObjectSerialization(Double.valueOf(0));
+		testObjectSerialization(Double.valueOf(1));
+		testObjectSerialization(Double.valueOf(-1));
+		testObjectSerialization(Double.valueOf(Math.E));
+		testObjectSerialization(Double.valueOf(Math.PI));
+		testObjectSerialization(Double.valueOf(Double.MIN_VALUE));
+		testObjectSerialization(Double.valueOf(Double.MAX_VALUE));
+		testObjectSerialization(Double.valueOf(Double.MIN_NORMAL));
+		testObjectSerialization(Double.valueOf(Double.NaN));
+		testObjectSerialization(Double.valueOf(Double.NEGATIVE_INFINITY));
+		testObjectSerialization(Double.valueOf(Double.POSITIVE_INFINITY));
+		
+		testObjectSerialization("");
+		testObjectSerialization("abcdefg");
+		testObjectSerialization("ab\u1535\u0155xyz\u706F");
+		
+		testObjectSerialization(new SimpleTypes(3637, 54876486548L, (byte) 65, "We're out looking for astronauts", (short) 0x2387, 2.65767523));
+		testObjectSerialization(new SimpleTypes(705608724, -1L, (byte) -65, "Serve me the sky with a big slice of lemon", (short) Byte.MIN_VALUE, 0.0000001));
+	}
+	
+	@Test
+	public void testArrayTypes() {
+		{
+			int[] array = new int[] {1, 2, 3, 4, 5};
+			testObjectSerialization(array);
+		}
+		{
+			long[] array = new long[] {1, 2, 3, 4, 5};
+			testObjectSerialization(array);
+		}
+		{
+			float[] array = new float[] {1, 2, 3, 4, 5};
+			testObjectSerialization(array);
+		}
+		{
+			double[] array = new double[] {1, 2, 3, 4, 5};
+			testObjectSerialization(array);
+		}
+		{
+			String[] array = new String[] {"Oh", "my", "what", "do", "we", "have", "here", "?"};
+			testObjectSerialization(array);
+		}
+	}
+	
+	@Test
+	public void testEmptyArray() {
+		{
+			int[] array = new int[0];
+			testObjectSerialization(array);
+		}
+		{
+			long[] array = new long[0];
+			testObjectSerialization(array);
+		}
+		{
+			float[] array = new float[0];
+			testObjectSerialization(array);
+		}
+		{
+			double[] array = new double[0];
+			testObjectSerialization(array);
+		}
+		{
+			String[] array = new String[0];
+			testObjectSerialization(array);
+		}
+	}
+	
+	@Test
+	public void testObjects() {
+		// simple object containing only primitives
+		{
+			testObjectSerialization(new Book(976243875L, "The Serialization Odysse", 42));
+		}
+		
+		// object with collection
+		{
+			ArrayList<String> list = new ArrayList<String>();
+			list.add("A");
+			list.add("B");
+			list.add("C");
+			list.add("D");
+			list.add("E");
+			
+			testObjectSerialization(new BookAuthor(976243875L, list, "Arno Nym"));
+		}
+		
+		// object with empty collection
+		{
+			ArrayList<String> list = new ArrayList<String>();
+			testObjectSerialization(new BookAuthor(987654321L, list, "The Saurus"));
+		}
+	}
+	
+	@Test
+	public void testNestedObjectsWithCollections() {
+		testObjectSerialization(new ComplexNestedObject2(true));
+	}
+	
+	@Test
+	public void testGeneratedObjectWithNullableFields() {
+		List<CharSequence> strings = Arrays.asList(new CharSequence[] { "These", "strings", "should", "be", "recognizable", "as", "a", "meaningful", "sequence" });
+		List<Boolean> bools = Arrays.asList(true, true, false, false, true, false, true, true);
+		Map<CharSequence, Long> map = new HashMap<CharSequence, Long>();
+		map.put("1", 1L);
+		map.put("2", 2L);
+		map.put("3", 3L);
+
+		byte[] b = new byte[16];
+		new Random().nextBytes(b);
+		Fixed16 f = new Fixed16(b);
+		Address addr = new Address(new Integer(239), "6th Main", "Bangalore",
+				"Karnataka", "560075");
+		User user = new User("Freudenreich", 1337, "macintosh gray",
+				1234567890L, 3.1415926, null, true, strings, bools, null,
+				Colors.GREEN, map, f, new Boolean(true), addr);
+		
+		testObjectSerialization(user);
+	}
+	
+	@Test
+	public void testVarLenCountEncoding() {
+		try {
+			long[] values = new long[] { 0, 1, 2, 3, 4, 0, 574, 45236, 0, 234623462, 23462462346L, 0, 9734028767869761L, 0x7fffffffffffffffL};
+			
+			// write
+			ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
+			{
+				DataOutputStream dataOut = new DataOutputStream(baos);
+				
+				for (long val : values) {
+					DataOutputEncoder.writeVarLongCount(dataOut, val);
+				}
+				
+				dataOut.flush();
+				dataOut.close();
+			}
+			
+			// read
+			{
+				ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+				DataInputStream dataIn = new DataInputStream(bais);
+				
+				for (long val : values) {
+					long read = DataInputDecoder.readVarLongCount(dataIn);
+					assertEquals("Wrong var-len encoded value read.", val, read);
+				}
+			}
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Test failed due to an exception: " + e.getMessage());
+		}
+	}
+	
+	private static <X> void testObjectSerialization(X obj) {
+		
+		try {
+			
+			// serialize
+			ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
+			{
+				DataOutputStream dataOut = new DataOutputStream(baos);
+				DataOutputEncoder encoder = new DataOutputEncoder();
+				encoder.setOut(dataOut);
+				
+				@SuppressWarnings("unchecked")
+				Class<X> clazz = (Class<X>) obj.getClass();
+				ReflectDatumWriter<X> writer = new ReflectDatumWriter<X>(clazz);
+				
+				writer.write(obj, encoder);
+				dataOut.flush();
+				dataOut.close();
+			}
+			
+			byte[] data = baos.toByteArray();
+			X result = null;
+			
+			// deserialize
+			{
+				ByteArrayInputStream bais = new ByteArrayInputStream(data);
+				DataInputStream dataIn = new DataInputStream(bais);
+				DataInputDecoder decoder = new DataInputDecoder();
+				decoder.setIn(dataIn);
+
+				@SuppressWarnings("unchecked")
+				Class<X> clazz = (Class<X>) obj.getClass();
+				ReflectDatumReader<X> reader = new ReflectDatumReader<X>(clazz);
+				
+				// create a reuse object if possible, otherwise we have no reuse object 
+				X reuse = null;
+				try {
+					@SuppressWarnings("unchecked")
+					X test = (X) obj.getClass().newInstance();
+					reuse = test;
+				} catch (Throwable t) {}
+				
+				result = reader.read(reuse, decoder);
+			}
+			
+			// check
+			final String message = "Deserialized object is not the same as the original";
+			
+			if (obj.getClass().isArray()) {
+				Class<?> clazz = obj.getClass();
+				if (clazz == byte[].class) {
+					assertArrayEquals(message, (byte[]) obj, (byte[]) result);
+				}
+				else if (clazz == short[].class) {
+					assertArrayEquals(message, (short[]) obj, (short[]) result);
+				}
+				else if (clazz == int[].class) {
+					assertArrayEquals(message, (int[]) obj, (int[]) result);
+				}
+				else if (clazz == long[].class) {
+					assertArrayEquals(message, (long[]) obj, (long[]) result);
+				}
+				else if (clazz == char[].class) {
+					assertArrayEquals(message, (char[]) obj, (char[]) result);
+				}
+				else if (clazz == float[].class) {
+					assertArrayEquals(message, (float[]) obj, (float[]) result, 0.0f);
+				}
+				else if (clazz == double[].class) {
+					assertArrayEquals(message, (double[]) obj, (double[]) result, 0.0);
+				} else {
+					assertArrayEquals(message, (Object[]) obj, (Object[]) result);
+				}
+			} else {
+				assertEquals(message, obj, result);
+			}
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Test failed due to an exception: " + e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Test Objects
+	// --------------------------------------------------------------------------------------------
+
+
+	public static final class SimpleTypes {
+		
+		private final int iVal;
+		private final long lVal;
+		private final byte bVal;
+		private final String sVal;
+		private final short rVal;
+		private final double dVal;
+		
+		
+		public SimpleTypes() {
+			this(0, 0, (byte) 0, "", (short) 0, 0);
+		}
+		
+		public SimpleTypes(int iVal, long lVal, byte bVal, String sVal, short rVal, double dVal) {
+			this.iVal = iVal;
+			this.lVal = lVal;
+			this.bVal = bVal;
+			this.sVal = sVal;
+			this.rVal = rVal;
+			this.dVal = dVal;
+		}
+		
+		@Override
+		public boolean equals(Object obj) {
+			if (obj.getClass() == SimpleTypes.class) {
+				SimpleTypes other = (SimpleTypes) obj;
+				
+				return other.iVal == this.iVal &&
+						other.lVal == this.lVal &&
+						other.bVal == this.bVal &&
+						other.sVal.equals(this.sVal) &&
+						other.rVal == this.rVal &&
+						other.dVal == this.dVal;
+				
+			} else {
+				return false;
+			}
+		}
+	}
+	
+	public static class ComplexNestedObject1 {
+		
+		private double doubleValue;
+		
+		private List<String> stringList;
+		
+		public ComplexNestedObject1() {}
+		
+		public ComplexNestedObject1(int offInit) {
+			this.doubleValue = 6293485.6723 + offInit;
+				
+			this.stringList = new ArrayList<String>();
+			this.stringList.add("A" + offInit);
+			this.stringList.add("somewhat" + offInit);
+			this.stringList.add("random" + offInit);
+			this.stringList.add("collection" + offInit);
+			this.stringList.add("of" + offInit);
+			this.stringList.add("strings" + offInit);
+		}
+		
+		@Override
+		public boolean equals(Object obj) {
+			if (obj.getClass() == ComplexNestedObject1.class) {
+				ComplexNestedObject1 other = (ComplexNestedObject1) obj;
+				return other.doubleValue == this.doubleValue && this.stringList.equals(other.stringList);
+			} else {
+				return false;
+			}
+		}
+	}
+	
+	public static class ComplexNestedObject2 {
+		
+		private long longValue;
+		
+		private Map<String, ComplexNestedObject1> theMap;
+		
+		public ComplexNestedObject2() {}
+		
+		public ComplexNestedObject2(boolean init) {
+			this.longValue = 46547;
+				
+			this.theMap = new HashMap<String, ComplexNestedObject1>();
+			this.theMap.put("36354L", new ComplexNestedObject1(43546543));
+			this.theMap.put("785611L", new ComplexNestedObject1(45784568));
+			this.theMap.put("43L", new ComplexNestedObject1(9876543));
+			this.theMap.put("-45687L", new ComplexNestedObject1(7897615));
+			this.theMap.put("1919876876896L", new ComplexNestedObject1(27154));
+			this.theMap.put("-868468468L", new ComplexNestedObject1(546435));
+		}
+		
+		@Override
+		public boolean equals(Object obj) {
+			if (obj.getClass() == ComplexNestedObject2.class) {
+				ComplexNestedObject2 other = (ComplexNestedObject2) obj;
+				return other.longValue == this.longValue && this.theMap.equals(other.theMap);
+			} else {
+				return false;
+			}
+		}
+	}
+	
+	public static class Book {
+
+		private long bookId;
+		private String title;
+		private long authorId;
+
+		public Book() {}
+
+		public Book(long bookId, String title, long authorId) {
+			this.bookId = bookId;
+			this.title = title;
+			this.authorId = authorId;
+		}
+		
+		@Override
+		public boolean equals(Object obj) {
+			if (obj.getClass() == Book.class) {
+				Book other = (Book) obj;
+				return other.bookId == this.bookId && other.authorId == this.authorId && this.title.equals(other.title);
+			} else {
+				return false;
+			}
+		}
+	}
+
+	public static class BookAuthor {
+
+		private long authorId;
+		private List<String> bookTitles;
+		private String authorName;
+
+		public BookAuthor() {}
+
+		public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
+			this.authorId = authorId;
+			this.bookTitles = bookTitles;
+			this.authorName = authorName;
+		}
+		
+		@Override
+		public boolean equals(Object obj) {
+			if (obj.getClass() == BookAuthor.class) {
+				BookAuthor other = (BookAuthor) obj;
+				return other.authorName.equals(this.authorName) && other.authorId == this.authorId &&
+						other.bookTitles.equals(this.bookTitles);
+			} else {
+				return false;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
new file mode 100644
index 0000000..1174786
--- /dev/null
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.avro.testjar;
+
+// ================================================================================================
+//  This file defines the classes for the AvroExternalJarProgramITCase.
+//  The program is exported into src/test/resources/AvroTestProgram.jar.
+//
+//  THIS FILE MUST STAY FULLY COMMENTED SUCH THAT THE HERE DEFINED CLASSES ARE NOT COMPILED
+//  AND ADDED TO THE test-classes DIRECTORY. OTHERWISE, THE EXTERNAL CLASS LOADING WILL
+//  NOT BE COVERED BY THIS TEST.
+// ================================================================================================
+
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.AvroInputFormat;
+import org.apache.flink.core.fs.Path;
+
+public class AvroExternalJarProgram  {
+
+	public static final class Color {
+		
+		private String name;
+		private double saturation;
+		
+		public Color() {
+			name = "";
+			saturation = 1.0;
+		}
+		
+		public Color(String name, double saturation) {
+			this.name = name;
+			this.saturation = saturation;
+		}
+		
+		public String getName() {
+			return name;
+		}
+		
+		public void setName(String name) {
+			this.name = name;
+		}
+		
+		public double getSaturation() {
+			return saturation;
+		}
+		
+		public void setSaturation(double saturation) {
+			this.saturation = saturation;
+		}
+		
+		@Override
+		public String toString() {
+			return name + '(' + saturation + ')';
+		}
+	}
+	
+	public static final class MyUser {
+		
+		private String name;
+		private List<Color> colors;
+		
+		public MyUser() {
+			name = "unknown";
+			colors = new ArrayList<Color>();
+		}
+		
+		public MyUser(String name, List<Color> colors) {
+			this.name = name;
+			this.colors = colors;
+		}
+		
+		public String getName() {
+			return name;
+		}
+		
+		public List<Color> getColors() {
+			return colors;
+		}
+		
+		public void setName(String name) {
+			this.name = name;
+		}
+		
+		public void setColors(List<Color> colors) {
+			this.colors = colors;
+		}
+		
+		@Override
+		public String toString() {
+			return name + " : " + colors;
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static final class NameExtractor extends RichMapFunction<MyUser, Tuple2<String, MyUser>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<String, MyUser> map(MyUser u) {
+			String namePrefix = u.getName().substring(0, 1);
+			return new Tuple2<String, MyUser>(namePrefix, u);
+		}
+	}
+	
+	public static final class NameGrouper extends RichReduceFunction<Tuple2<String, MyUser>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<String, MyUser> reduce(Tuple2<String, MyUser> val1, Tuple2<String, MyUser> val2) {
+			return val1;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Test Data
+	// --------------------------------------------------------------------------------------------
+	
+	public static final class Generator {
+		
+		private final Random rnd = new Random(2389756789345689276L);
+		
+		public MyUser nextUser() {
+			return randomUser();
+		}
+		
+		private MyUser randomUser() {
+			
+			int numColors = rnd.nextInt(5);
+			ArrayList<Color> colors = new ArrayList<Color>(numColors);
+			for (int i = 0; i < numColors; i++) {
+				colors.add(new Color(randomString(), rnd.nextDouble()));
+			}
+			
+			return new MyUser(randomString(), colors);
+		}
+		
+		private String randomString() {
+			char[] c = new char[this.rnd.nextInt(20) + 5];
+			
+			for (int i = 0; i < c.length; i++) {
+				c[i] = (char) (this.rnd.nextInt(150) + 40);
+			}
+			
+			return new String(c);
+		}
+	}
+	
+	public static void writeTestData(File testFile, int numRecords) throws IOException {
+		
+		DatumWriter<MyUser> userDatumWriter = new ReflectDatumWriter<MyUser>(MyUser.class);
+		DataFileWriter<MyUser> dataFileWriter = new DataFileWriter<MyUser>(userDatumWriter);
+		
+		dataFileWriter.create(ReflectData.get().getSchema(MyUser.class), testFile);
+		
+		
+		Generator generator = new Generator();
+		
+		for (int i = 0; i < numRecords; i++) {
+			MyUser user = generator.nextUser();
+			dataFileWriter.append(user);
+		}
+		
+		dataFileWriter.close();
+	}
+
+//	public static void main(String[] args) throws Exception {
+//		String testDataFile = new File("src/test/resources/testdata.avro").getAbsolutePath();
+//		writeTestData(new File(testDataFile), 50);
+//	}
+	
+	public static void main(String[] args) throws Exception {
+		String inputPath = args[0];
+		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		DataSet<MyUser> input = env.createInput(new AvroInputFormat<MyUser>(new Path(inputPath), MyUser.class));
+	
+		DataSet<Tuple2<String, MyUser>> result = input.map(new NameExtractor()).groupBy(0).reduce(new NameGrouper());
+		
+		result.output(new DiscardingOutputFormat<Tuple2<String,MyUser>>());
+		env.execute();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
new file mode 100644
index 0000000..f33f433
--- /dev/null
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.io.avro;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.io.avro.generated.User;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.AvroInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+@RunWith(Parameterized.class)
+public class AvroPojoTest extends MultipleProgramsTestBase {
+	public AvroPojoTest(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	private File inFile;
+	private String resultPath;
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
+		inFile = tempFolder.newFile();
+		AvroRecordInputFormatTest.writeTestFile(inFile);
+	}
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expected, resultPath);
+	}
+
+	@Test
+	public void testSimpleAvroRead() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Path in = new Path(inFile.getAbsoluteFile().toURI());
+
+		AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+		DataSet<User> usersDS = env.createInput(users)
+				// null map type because the order changes in different JVMs (hard to test)
+		.map(new MapFunction<User, User>() {
+			@Override
+			public User map(User value) throws Exception {
+				value.setTypeMap(null);
+				return value;
+			}
+		});
+
+		usersDS.writeAsText(resultPath);
+
+		env.execute("Simple Avro read job");
+
+
+		expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" +
+					"{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n";
+	}
+
+	@Test
+	public void testSerializeWithAvro() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableForceAvro();
+		Path in = new Path(inFile.getAbsoluteFile().toURI());
+
+		AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+		DataSet<User> usersDS = env.createInput(users)
+				// null map type because the order changes in different JVMs (hard to test)
+				.map(new MapFunction<User, User>() {
+					@Override
+					public User map(User value) throws Exception {
+						Map<CharSequence, Long> ab = new HashMap<CharSequence, Long>(1);
+						ab.put("hehe", 12L);
+						value.setTypeMap(ab);
+						return value;
+					}
+				});
+
+		usersDS.writeAsText(resultPath);
+
+		env.execute("Simple Avro read job");
+
+
+		expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" +
+					"{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n";
+
+	}
+
+	@Test
+	public void testKeySelection() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableObjectReuse();
+		Path in = new Path(inFile.getAbsoluteFile().toURI());
+
+		AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+		DataSet<User> usersDS = env.createInput(users);
+
+		DataSet<Tuple2<String, Integer>> res = usersDS.groupBy("name").reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() {
+			@Override
+			public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception {
+				for (User u : values) {
+					out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1));
+				}
+			}
+		});
+		res.writeAsText(resultPath);
+		env.execute("Avro Key selection");
+
+
+		expected = "(Alyssa,1)\n(Charlie,1)\n";
+	}
+
+	@Test
+	public void testWithAvroGenericSer() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableForceAvro();
+		Path in = new Path(inFile.getAbsoluteFile().toURI());
+
+		AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+		DataSet<User> usersDS = env.createInput(users);
+
+		DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new KeySelector<User, String>() {
+			@Override
+			public String getKey(User value) throws Exception {
+				return String.valueOf(value.getName());
+			}
+		}).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() {
+			@Override
+			public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception {
+				for(User u : values) {
+					out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1));
+				}
+			}
+		});
+
+		res.writeAsText(resultPath);
+		env.execute("Avro Key selection");
+
+
+		expected = "(Charlie,1)\n(Alyssa,1)\n";
+	}
+
+	@Test
+	public void testWithKryoGenericSer() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableForceKryo();
+		Path in = new Path(inFile.getAbsoluteFile().toURI());
+
+		AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+		DataSet<User> usersDS = env.createInput(users);
+
+		DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new KeySelector<User, String>() {
+			@Override
+			public String getKey(User value) throws Exception {
+				return String.valueOf(value.getName());
+			}
+		}).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() {
+			@Override
+			public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception {
+				for (User u : values) {
+					out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1));
+				}
+			}
+		});
+
+		res.writeAsText(resultPath);
+		env.execute("Avro Key selection");
+
+
+		expected = "(Charlie,1)\n(Alyssa,1)\n";
+	}
+
+	/**
+	 * Test some know fields for grouping on
+	 */
+	@Test
+	public void testAllFields() throws Exception {
+		for(String fieldName : Arrays.asList("name", "type_enum", "type_double_test")) {
+			testField(fieldName);
+		}
+	}
+
+	private void testField(final String fieldName) throws Exception {
+		before();
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Path in = new Path(inFile.getAbsoluteFile().toURI());
+
+		AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+		DataSet<User> usersDS = env.createInput(users);
+
+		DataSet<Object> res = usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction<User, Object>() {
+			@Override
+			public void reduce(Iterable<User> values, Collector<Object> out) throws Exception {
+				for(User u : values) {
+					out.collect(u.get(fieldName));
+				}
+			}
+		});
+		res.writeAsText(resultPath);
+		env.execute("Simple Avro read job");
+
+		// test if automatic registration of the Types worked
+		ExecutionConfig ec = env.getConfig();
+		Assert.assertTrue(ec.getRegisteredKryoTypes().contains(org.apache.flink.api.io.avro.generated.Fixed16.class));
+
+		if(fieldName.equals("name")) {
+			expected = "Alyssa\nCharlie";
+		} else if(fieldName.equals("type_enum")) {
+			expected = "GREEN\nRED\n";
+		} else if(fieldName.equals("type_double_test")) {
+			expected = "123.45\n1.337\n";
+		} else {
+			Assert.fail("Unknown field");
+		}
+
+		after();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
new file mode 100644
index 0000000..91a9612
--- /dev/null
+++ b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.io.avro;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.io.avro.generated.Address;
+import org.apache.flink.api.io.avro.generated.Colors;
+import org.apache.flink.api.io.avro.generated.User;
+import org.apache.flink.api.java.io.AvroInputFormat;
+import org.apache.flink.api.java.typeutils.AvroTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the avro input format.
+ * (The testcase is mostly the getting started tutorial of avro)
+ * http://avro.apache.org/docs/current/gettingstartedjava.html
+ */
+public class AvroRecordInputFormatTest {
+	
+	public File testFile;
+	
+	final static String TEST_NAME = "Alyssa";
+	
+	final static String TEST_ARRAY_STRING_1 = "ELEMENT 1";
+	final static String TEST_ARRAY_STRING_2 = "ELEMENT 2";
+	
+	final static boolean TEST_ARRAY_BOOLEAN_1 = true;
+	final static boolean TEST_ARRAY_BOOLEAN_2 = false;
+	
+	final static Colors TEST_ENUM_COLOR = Colors.GREEN;
+	
+	final static String TEST_MAP_KEY1 = "KEY 1";
+	final static long TEST_MAP_VALUE1 = 8546456L;
+	final static String TEST_MAP_KEY2 = "KEY 2";
+	final static long TEST_MAP_VALUE2 = 17554L;
+	
+	final static int TEST_NUM = 239;
+	final static String TEST_STREET = "Baker Street";
+	final static String TEST_CITY = "London";
+	final static String TEST_STATE = "London";
+	final static String TEST_ZIP = "NW1 6XE";
+	
+
+	private Schema userSchema = new User().getSchema();
+
+
+	public static void writeTestFile(File testFile) throws IOException {
+		ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>();
+		stringArray.add(TEST_ARRAY_STRING_1);
+		stringArray.add(TEST_ARRAY_STRING_2);
+
+		ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
+		booleanArray.add(TEST_ARRAY_BOOLEAN_1);
+		booleanArray.add(TEST_ARRAY_BOOLEAN_2);
+
+		HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>();
+		longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
+		longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
+		
+		Address addr = new Address();
+		addr.setNum(TEST_NUM);
+		addr.setStreet(TEST_STREET);
+		addr.setCity(TEST_CITY);
+		addr.setState(TEST_STATE);
+		addr.setZip(TEST_ZIP);
+
+
+		User user1 = new User();
+
+		user1.setName(TEST_NAME);
+		user1.setFavoriteNumber(256);
+		user1.setTypeDoubleTest(123.45d);
+		user1.setTypeBoolTest(true);
+		user1.setTypeArrayString(stringArray);
+		user1.setTypeArrayBoolean(booleanArray);
+		user1.setTypeEnum(TEST_ENUM_COLOR);
+		user1.setTypeMap(longMap);
+		user1.setTypeNested(addr);
+
+		// Construct via builder
+		User user2 = User.newBuilder()
+				.setName("Charlie")
+				.setFavoriteColor("blue")
+				.setFavoriteNumber(null)
+				.setTypeBoolTest(false)
+				.setTypeDoubleTest(1.337d)
+				.setTypeNullTest(null)
+				.setTypeLongTest(1337L)
+				.setTypeArrayString(new ArrayList<CharSequence>())
+				.setTypeArrayBoolean(new ArrayList<Boolean>())
+				.setTypeNullableArray(null)
+				.setTypeEnum(Colors.RED)
+				.setTypeMap(new HashMap<CharSequence, Long>())
+				.setTypeFixed(null)
+				.setTypeUnion(null)
+				.setTypeNested(
+						Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET)
+								.setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP)
+								.build())
+				.build();
+		DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
+		DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
+		dataFileWriter.create(user1.getSchema(), testFile);
+		dataFileWriter.append(user1);
+		dataFileWriter.append(user2);
+		dataFileWriter.close();
+	}
+	@Before
+	public void createFiles() throws IOException {
+		testFile = File.createTempFile("AvroInputFormatTest", null);
+		writeTestFile(testFile);
+	}
+
+
+	/**
+	 * Test if the AvroInputFormat is able to properly read data from an avro file.
+	 * @throws IOException
+	 */
+	@Test
+	public void testDeserialisation() throws IOException {
+		Configuration parameters = new Configuration();
+		
+		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+		
+		format.configure(parameters);
+		FileInputSplit[] splits = format.createInputSplits(1);
+		assertEquals(splits.length, 1);
+		format.open(splits[0]);
+		
+		User u = format.nextRecord(null);
+		assertNotNull(u);
+		
+		String name = u.getName().toString();
+		assertNotNull("empty record", name);
+		assertEquals("name not equal", TEST_NAME, name);
+		
+		// check arrays
+		List<CharSequence> sl = u.getTypeArrayString();
+		assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString());
+		assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString());
+		
+		List<Boolean> bl = u.getTypeArrayBoolean();
+		assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0));
+		assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1));
+		
+		// check enums
+		Colors enumValue = u.getTypeEnum();
+		assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue);
+		
+		// check maps
+		Map<CharSequence, Long> lm = u.getTypeMap();
+		assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
+		assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
+		
+		assertFalse("expecting second element", format.reachedEnd());
+		assertNotNull("expecting second element", format.nextRecord(u));
+		
+		assertNull(format.nextRecord(u));
+		assertTrue(format.reachedEnd());
+		
+		format.close();
+	}
+
+	/**
+	 * Test if the AvroInputFormat is able to properly read data from an avro file.
+	 * @throws IOException
+	 */
+	@Test
+	public void testDeserialisationReuseAvroRecordFalse() throws IOException {
+		Configuration parameters = new Configuration();
+		
+		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+		format.setReuseAvroValue(false);
+		
+		format.configure(parameters);
+		FileInputSplit[] splits = format.createInputSplits(1);
+		assertEquals(splits.length, 1);
+		format.open(splits[0]);
+		
+		User u = format.nextRecord(null);
+		assertNotNull(u);
+		
+		String name = u.getName().toString();
+		assertNotNull("empty record", name);
+		assertEquals("name not equal", TEST_NAME, name);
+		
+		// check arrays
+		List<CharSequence> sl = u.getTypeArrayString();
+		assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString());
+		assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString());
+		
+		List<Boolean> bl = u.getTypeArrayBoolean();
+		assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0));
+		assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1));
+		
+		// check enums
+		Colors enumValue = u.getTypeEnum();
+		assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue);
+		
+		// check maps
+		Map<CharSequence, Long> lm = u.getTypeMap();
+		assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
+		assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
+		
+		assertFalse("expecting second element", format.reachedEnd());
+		assertNotNull("expecting second element", format.nextRecord(u));
+		
+		assertNull(format.nextRecord(u));
+		assertTrue(format.reachedEnd());
+		
+		format.close();
+	}
+
+	/**
+	 * Test if the Flink serialization is able to properly process GenericData.Record types.
+	 * Usually users of Avro generate classes (POJOs) from Avro schemas.
+	 * However, if generated classes are not available, one can also use GenericData.Record.
+	 * It is an untyped key-value record which is using a schema to validate the correctness of the data.
+	 *
+	 * It is not recommended to use GenericData.Record with Flink. Use generated POJOs instead.
+	 */
+	@Test
+	public void testDeserializeToGenericType() throws IOException {
+		DatumReader<GenericData.Record> datumReader = new GenericDatumReader<>(userSchema);
+
+		try (FileReader<GenericData.Record> dataFileReader = DataFileReader.openReader(testFile, datumReader)) {
+			// initialize Record by reading it from disk (thats easier than creating it by hand)
+			GenericData.Record rec = new GenericData.Record(userSchema);
+			dataFileReader.next(rec);
+			
+			// check if record has been read correctly
+			assertNotNull(rec);
+			assertEquals("name not equal", TEST_NAME, rec.get("name").toString());
+			assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());
+			assertEquals(null, rec.get("type_long_test")); // it is null for the first record.
+
+			// now serialize it with our framework:
+			TypeInformation<GenericData.Record> te = TypeExtractor.createTypeInfo(GenericData.Record.class);
+
+			ExecutionConfig ec = new ExecutionConfig();
+			Assert.assertEquals(GenericTypeInfo.class, te.getClass());
+			
+			Serializers.recursivelyRegisterType(te.getTypeClass(), ec, new HashSet<Class<?>>());
+
+			TypeSerializer<GenericData.Record> tser = te.createSerializer(ec);
+			Assert.assertEquals(1, ec.getDefaultKryoSerializerClasses().size());
+			Assert.assertTrue(
+					ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) &&
+							ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(Serializers.AvroSchemaSerializer.class));
+
+			ByteArrayOutputStream out = new ByteArrayOutputStream();
+			try (DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(out)) {
+				tser.serialize(rec, outView);
+			}
+
+			GenericData.Record newRec;
+			try (DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(
+					new ByteArrayInputStream(out.toByteArray())))
+			{
+				newRec = tser.deserialize(inView);
+			}
+
+			// check if it is still the same
+			assertNotNull(newRec);
+			assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString());
+			assertEquals("name not equal", TEST_NAME, newRec.get("name").toString());
+			assertEquals(null, newRec.get("type_long_test"));
+		}
+	}
+		
+	/**
+	 * This test validates proper serialization with specific (generated POJO) types.
+	 */
+	@Test
+	public void testDeserializeToSpecificType() throws IOException {
+
+		DatumReader<User> datumReader = new SpecificDatumReader<User>(userSchema);
+
+		try (FileReader<User> dataFileReader = DataFileReader.openReader(testFile, datumReader)) {
+			User rec = dataFileReader.next();
+
+			// check if record has been read correctly
+			assertNotNull(rec);
+			assertEquals("name not equal", TEST_NAME, rec.get("name").toString());
+			assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());
+
+			// now serialize it with our framework:
+			ExecutionConfig ec = new ExecutionConfig();
+			TypeInformation<User> te = TypeExtractor.createTypeInfo(User.class);
+
+			Assert.assertEquals(AvroTypeInfo.class, te.getClass());
+			TypeSerializer<User> tser = te.createSerializer(ec);
+
+			ByteArrayOutputStream out = new ByteArrayOutputStream();
+			try (DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(out)) {
+				tser.serialize(rec, outView);
+			}
+
+			User newRec;
+			try (DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(
+					new ByteArrayInputStream(out.toByteArray())))
+			{
+				newRec = tser.deserialize(inView);
+			}
+
+			// check if it is still the same
+			assertNotNull(newRec);
+			assertEquals("name not equal", TEST_NAME, newRec.getName().toString());
+			assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.getTypeEnum().toString());
+		}
+	}
+
+	/**
+	 * Test if the AvroInputFormat is able to properly read data from an Avro
+	 * file as a GenericRecord.
+	 * 
+	 * @throws IOException,
+	 *             if there is an exception
+	 */
+	@Test
+	public void testDeserialisationGenericRecord() throws IOException {
+		Configuration parameters = new Configuration();
+
+		AvroInputFormat<GenericRecord> format = new AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()),
+				GenericRecord.class);
+
+		doTestDeserializationGenericRecord(format, parameters);
+	}
+
+	/**
+	 * Helper method to test GenericRecord serialisation
+	 * 
+	 * @param format
+	 *            the format to test
+	 * @param parameters
+	 *            the configuration to use
+	 * @throws IOException
+	 *             thrown id there is a issue
+	 */
+	@SuppressWarnings("unchecked")
+	private void doTestDeserializationGenericRecord(final AvroInputFormat<GenericRecord> format,
+			final Configuration parameters) throws IOException {
+		try {
+			format.configure(parameters);
+			FileInputSplit[] splits = format.createInputSplits(1);
+			assertEquals(splits.length, 1);
+			format.open(splits[0]);
+
+			GenericRecord u = format.nextRecord(null);
+			assertNotNull(u);
+			assertEquals("The schemas should be equal", userSchema, u.getSchema());
+
+			String name = u.get("name").toString();
+			assertNotNull("empty record", name);
+			assertEquals("name not equal", TEST_NAME, name);
+
+			// check arrays
+			List<CharSequence> sl = (List<CharSequence>) u.get("type_array_string");
+			assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString());
+			assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString());
+
+			List<Boolean> bl = (List<Boolean>) u.get("type_array_boolean");
+			assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0));
+			assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1));
+
+			// check enums
+			GenericData.EnumSymbol enumValue = (GenericData.EnumSymbol) u.get("type_enum");
+			assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), enumValue.toString());
+
+			// check maps
+			Map<CharSequence, Long> lm = (Map<CharSequence, Long>) u.get("type_map");
+			assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
+			assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
+
+			assertFalse("expecting second element", format.reachedEnd());
+			assertNotNull("expecting second element", format.nextRecord(u));
+
+			assertNull(format.nextRecord(u));
+			assertTrue(format.reachedEnd());
+		} finally {
+			format.close();
+		}
+	}
+
+	/**
+	 * Test if the AvroInputFormat is able to properly read data from an avro
+	 * file as a GenericRecord
+	 * 
+	 * @throws IOException,
+	 *             if there is an error
+	 */
+	@Test
+	public void testDeserialisationGenericRecordReuseAvroValueFalse() throws IOException {
+		Configuration parameters = new Configuration();
+
+		AvroInputFormat<GenericRecord> format = new AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()),
+				GenericRecord.class);
+		format.configure(parameters);
+		format.setReuseAvroValue(false);
+
+		doTestDeserializationGenericRecord(format, parameters);
+	}
+
+	@After
+	public void deleteFiles() {
+		testFile.delete();
+	}
+
+}