You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/07/12 14:48:49 UTC

[70/73] [abbrv] prefix all projects in addons and quickstarts with flink-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/pom.xml b/flink-addons/flink-avro/pom.xml
new file mode 100644
index 0000000..dc06c09
--- /dev/null
+++ b/flink-addons/flink-avro/pom.xml
@@ -0,0 +1,158 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+	
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	
+	<modelVersion>4.0.0</modelVersion>
+	
+	<parent>
+		<artifactId>flink-addons</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>0.6-incubating-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-avro</artifactId>
+	<name>flink-avro</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.avro</groupId>
+			<artifactId>avro</artifactId>
+			<version>1.7.5</version>
+		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.avro</groupId>
+			<artifactId>avro-mapred</artifactId>
+			<version>1.7.5</version>
+		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		
+	</dependencies>
+
+	<build>
+		<plugins>
+		<!-- Exclude ExternalJar contents from regular build -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<configuration>
+					<excludes>
+						<exclude>**/src/test/java/org/apache/flink/api/avro/testjar/*.java</exclude>
+					</excludes>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>create-test-dependency</id>
+						<phase>process-test-classes</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+						<configuration>
+							<archive>
+								<manifest>
+									<mainClass>org.apache.flink.api.avro.testjar.AvroExternalJarProgram</mainClass>
+								</manifest>
+							</archive>
+							<finalName>maven</finalName>
+							<attach>false</attach>
+							<descriptors>
+								<descriptor>src/test/assembly/test-assembly.xml</descriptor>
+							</descriptors>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+		
+		<pluginManagement>
+			<plugins>
+				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+				<plugin>
+					<groupId>org.eclipse.m2e</groupId>
+					<artifactId>lifecycle-mapping</artifactId>
+					<version>1.0.0</version>
+					<configuration>
+						<lifecycleMappingMetadata>
+							<pluginExecutions>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>
+											org.apache.maven.plugins
+										</groupId>
+										<artifactId>
+											maven-assembly-plugin
+										</artifactId>
+										<versionRange>
+											[2.4,)
+										</versionRange>
+										<goals>
+											<goal>single</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore></ignore>
+									</action>
+								</pluginExecution>
+							</pluginExecutions>
+						</lifecycleMappingMetadata>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+	</build>
+
+	<profiles>
+		<profile>
+			<!-- A bug with java6 is causing the javadoc generation
+			to fail because the test case contains junit?
+			See https://github.com/stratosphere/stratosphere/pull/405#issuecomment-32591978
+			for further links -->
+			<id>disable-javadocs-in-java6</id>
+			<activation>
+				 <jdk>(,1.6]</jdk> <!-- disable javadocs for java6 or lower. -->
+			</activation>
+			<properties>
+				<maven.javadoc.skip>true</maven.javadoc.skip>
+			</properties>
+		</profile>
+	</profiles>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java
new file mode 100644
index 0000000..8b58a98
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Key;
+import org.apache.flink.util.ReflectionUtil;
+
+
+public abstract class AvroBaseValue<T> extends AvroValue<T> implements Key<AvroBaseValue<T>> {
+	
+	private static final long serialVersionUID = 1L;
+
+
+	public AvroBaseValue() {}
+	
+	public AvroBaseValue(T datum) {
+		super(datum);
+	}
+
+	
+	// --------------------------------------------------------------------------------------------
+	//  Serialization / Deserialization
+	// --------------------------------------------------------------------------------------------
+	
+	private ReflectDatumWriter<T> writer;
+	private ReflectDatumReader<T> reader;
+	
+	private DataOutputEncoder encoder;
+	private DataInputDecoder decoder;
+	
+	
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		// the null flag
+		if (datum() == null) {
+			out.writeBoolean(false);
+		} else {
+			out.writeBoolean(true);
+			
+			DataOutputEncoder encoder = getEncoder();
+			encoder.setOut(out);
+			getWriter().write(datum(), encoder);
+		}
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		// the null flag
+		if (in.readBoolean()) {
+			
+			DataInputDecoder decoder = getDecoder();
+			decoder.setIn(in);
+			datum(getReader().read(datum(), decoder));
+		}
+	}
+	
+	private ReflectDatumWriter<T> getWriter() {
+		if (this.writer == null) {
+			@SuppressWarnings("unchecked")
+			Class<T> clazz = (Class<T>) datum().getClass();
+			this.writer = new ReflectDatumWriter<T>(clazz);
+		}
+		return this.writer;
+	}
+	
+	private ReflectDatumReader<T> getReader() {
+		if (this.reader == null) {
+			Class<T> datumClass = ReflectionUtil.getTemplateType1(getClass());
+			this.reader = new ReflectDatumReader<T>(datumClass);
+		}
+		return this.reader;
+	}
+	
+	private DataOutputEncoder getEncoder() {
+		if (this.encoder == null) {
+			this.encoder = new DataOutputEncoder();
+		}
+		return this.encoder;
+	}
+	
+	private DataInputDecoder getDecoder() {
+		if (this.decoder == null) {
+			this.decoder = new DataInputDecoder();
+		}
+		return this.decoder;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Hashing / Equality
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public int hashCode() {
+		return datum() == null ? 0 : datum().hashCode();
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj.getClass() == this.getClass()) {
+			Object otherDatum = ((AvroBaseValue<?>) obj).datum();
+			Object thisDatum = datum();
+			
+			if (thisDatum == null) {
+				return otherDatum == null;
+			} else {
+				return thisDatum.equals(otherDatum);
+			}
+		} else {
+			return false;
+		}
+	}
+	
+	@Override
+	public String toString() {
+		return "AvroBaseValue (" + datum() + ")";
+	}
+	
+	@SuppressWarnings("unchecked")
+	@Override
+	public int compareTo(AvroBaseValue<T> o) {
+		Object otherDatum = o.datum();
+		Object thisDatum = datum();
+		
+		if (thisDatum == null) {
+			return otherDatum == null ? 0 : -1;
+		} else {
+			return otherDatum == null ? 1: ((Comparable<Object>) thisDatum).compareTo(otherDatum);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
new file mode 100644
index 0000000..3c9fd34
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/DataInputDecoder.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.io.Decoder;
+import org.apache.avro.util.Utf8;
+
+
+public class DataInputDecoder extends Decoder {
+	
+	private final Utf8 stringDecoder = new Utf8();
+	
+	private DataInput in;
+	
+	public void setIn(DataInput in) {
+		this.in = in;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// primitives
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void readNull() {}
+	
+
+	@Override
+	public boolean readBoolean() throws IOException {
+		return in.readBoolean();
+	}
+
+	@Override
+	public int readInt() throws IOException {
+		return in.readInt();
+	}
+
+	@Override
+	public long readLong() throws IOException {
+		return in.readLong();
+	}
+
+	@Override
+	public float readFloat() throws IOException {
+		return in.readFloat();
+	}
+
+	@Override
+	public double readDouble() throws IOException {
+		return in.readDouble();
+	}
+	
+	@Override
+	public int readEnum() throws IOException {
+		return readInt();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	// bytes
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void readFixed(byte[] bytes, int start, int length) throws IOException {
+		in.readFully(bytes, start, length);
+	}
+	
+	@Override
+	public ByteBuffer readBytes(ByteBuffer old) throws IOException {
+		int length = readInt();
+		ByteBuffer result;
+		if (old != null && length <= old.capacity() && old.hasArray()) {
+			result = old;
+			result.clear();
+		} else {
+			result = ByteBuffer.allocate(length);
+		}
+		in.readFully(result.array(), result.arrayOffset() + result.position(), length);
+		result.limit(length);
+		return result;
+	}
+	
+	
+	@Override
+	public void skipFixed(int length) throws IOException {
+		skipBytes(length);
+	}
+	
+	@Override
+	public void skipBytes() throws IOException {
+		int num = readInt();
+		skipBytes(num);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	// strings
+	// --------------------------------------------------------------------------------------------
+	
+	
+	@Override
+	public Utf8 readString(Utf8 old) throws IOException {
+		int length = readInt();
+		Utf8 result = (old != null ? old : new Utf8());
+		result.setByteLength(length);
+		
+		if (length > 0) {
+			in.readFully(result.getBytes(), 0, length);
+		}
+		
+		return result;
+	}
+
+	@Override
+	public String readString() throws IOException {
+		return readString(stringDecoder).toString();
+	}
+
+	@Override
+	public void skipString() throws IOException {
+		int len = readInt();
+		skipBytes(len);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// collection types
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public long readArrayStart() throws IOException {
+		return readVarLongCount(in);
+	}
+
+	@Override
+	public long arrayNext() throws IOException {
+		return readVarLongCount(in);
+	}
+
+	@Override
+	public long skipArray() throws IOException {
+		return readVarLongCount(in);
+	}
+
+	@Override
+	public long readMapStart() throws IOException {
+		return readVarLongCount(in);
+	}
+
+	@Override
+	public long mapNext() throws IOException {
+		return readVarLongCount(in);
+	}
+
+	@Override
+	public long skipMap() throws IOException {
+		return readVarLongCount(in);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	// union
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public int readIndex() throws IOException {
+		return readInt();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	// utils
+	// --------------------------------------------------------------------------------------------
+	
+	private void skipBytes(int num) throws IOException {
+		while (num > 0) {
+			num -= in.skipBytes(num);
+		}
+	}
+	
+	public static long readVarLongCount(DataInput in) throws IOException {
+		long value = in.readUnsignedByte();
+
+		if ((value & 0x80) == 0) {
+			return value;
+		}
+		else {
+			long curr;
+			int shift = 7;
+			value = value & 0x7f;
+			while (((curr = in.readUnsignedByte()) & 0x80) != 0){
+				value |= (curr & 0x7f) << shift;
+				shift += 7;
+			}
+			value |= curr << shift;
+			return value;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
new file mode 100644
index 0000000..5463237
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/DataOutputEncoder.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.io.Encoder;
+import org.apache.avro.util.Utf8;
+
+
+public final class DataOutputEncoder extends Encoder implements java.io.Serializable {
+	
+	private static final long serialVersionUID = 1L;
+	
+	private DataOutput out;
+	
+	
+	public void setOut(DataOutput out) {
+		this.out = out;
+	}
+
+
+	@Override
+	public void flush() throws IOException {}
+
+	// --------------------------------------------------------------------------------------------
+	// primitives
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void writeNull() {}
+	
+
+	@Override
+	public void writeBoolean(boolean b) throws IOException {
+		out.writeBoolean(b);
+	}
+
+	@Override
+	public void writeInt(int n) throws IOException {
+		out.writeInt(n);
+	}
+
+	@Override
+	public void writeLong(long n) throws IOException {
+		out.writeLong(n);
+	}
+
+	@Override
+	public void writeFloat(float f) throws IOException {
+		out.writeFloat(f);
+	}
+
+	@Override
+	public void writeDouble(double d) throws IOException {
+		out.writeDouble(d);
+	}
+	
+	@Override
+	public void writeEnum(int e) throws IOException {
+		out.writeInt(e);
+	}
+	
+	
+	// --------------------------------------------------------------------------------------------
+	// bytes
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void writeFixed(byte[] bytes, int start, int len) throws IOException {
+		out.write(bytes, start, len);
+	}
+	
+	@Override
+	public void writeBytes(byte[] bytes, int start, int len) throws IOException {
+		out.writeInt(len);
+		if (len > 0) {
+			out.write(bytes, start, len);
+		}
+	}
+	
+	@Override
+	public void writeBytes(ByteBuffer bytes) throws IOException {
+		int num = bytes.remaining();
+		out.writeInt(num);
+		
+		if (num > 0) {
+			writeFixed(bytes);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	// strings
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void writeString(String str) throws IOException {
+		byte[] bytes = Utf8.getBytesFor(str);
+		writeBytes(bytes, 0, bytes.length);
+	}
+	
+	@Override
+	public void writeString(Utf8 utf8) throws IOException {
+		writeBytes(utf8.getBytes(), 0, utf8.getByteLength());
+		
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// collection types
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void writeArrayStart() {}
+
+	@Override
+	public void setItemCount(long itemCount) throws IOException {
+		if (itemCount > 0) {
+			writeVarLongCount(out, itemCount);
+		}
+	}
+
+	@Override
+	public void startItem() {}
+
+	@Override
+	public void writeArrayEnd() throws IOException {
+		// write a single byte 0, shortcut for a var-length long of 0
+		out.write(0);
+	}
+
+	@Override
+	public void writeMapStart() {}
+
+	@Override
+	public void writeMapEnd() throws IOException {
+		// write a single byte 0, shortcut for a var-length long of 0
+		out.write(0);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// union
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void writeIndex(int unionIndex) throws IOException {
+		out.writeInt(unionIndex);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	// utils
+	// --------------------------------------------------------------------------------------------
+		
+	
+	public static final void writeVarLongCount(DataOutput out, long val) throws IOException {
+		if (val < 0) {
+			throw new IOException("Illegal count (must be non-negative): " + val);
+		}
+		
+		while ((val & ~0x7FL) != 0) {
+			out.write(((int) val) | 0x80);
+			val >>>= 7;
+		}
+		out.write((int) val);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
new file mode 100644
index 0000000..cb4a739
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/FSDataInputStreamWrapper.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.avro;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.avro.file.SeekableInput;
+import org.apache.flink.core.fs.FSDataInputStream;
+
+
+/**
+ * Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache licensed as well)
+ * 
+ * The wrapper keeps track of the position in the data stream.
+ */
+public class FSDataInputStreamWrapper implements Closeable, SeekableInput {
+	private final FSDataInputStream stream;
+	private final long len;
+	private long pos;
+
+	public FSDataInputStreamWrapper(FSDataInputStream stream, long len) {
+		this.stream = stream;
+		this.len = len;
+		this.pos = 0;
+	}
+
+	public long length() {
+		return len;
+	}
+
+	public int read(byte[] b, int off, int len) throws IOException {
+		int read;
+		read = stream.read(b, off, len);
+		pos += read;
+		return read;
+	}
+
+	public void seek(long p) throws IOException {
+		stream.seek(p);
+		pos = p;
+	}
+
+	public long tell() throws IOException {
+		return pos;
+	}
+
+	public void close() throws IOException {
+		stream.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
new file mode 100644
index 0000000..1031d81
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.io;
+
+import java.io.IOException;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.avro.FSDataInputStreamWrapper;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.TypeInformation;
+import org.apache.flink.util.InstantiationUtil;
+
+
+public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E> {
+	
+	private static final long serialVersionUID = 1L;
+
+	private static final Log LOG = LogFactory.getLog(AvroInputFormat.class);
+	
+	
+	private final Class<E> avroValueType;
+	
+	private boolean reuseAvroValue = true;
+	
+
+	private transient FileReader<E> dataFileReader;
+
+	
+	public AvroInputFormat(Path filePath, Class<E> type) {
+		super(filePath);
+		this.avroValueType = type;
+		this.unsplittable = true;
+	}
+	
+	
+	/**
+	 * Sets the flag whether to reuse the Avro value instance for all records.
+	 * By default, the input format reuses the Avro value.
+	 *
+	 * @param reuseAvroValue True, if the input format should reuse the Avro value instance, false otherwise.
+	 */
+	public void setReuseAvroValue(boolean reuseAvroValue) {
+		this.reuseAvroValue = reuseAvroValue;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	// Typing
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public TypeInformation<E> getProducedType() {
+		return TypeExtractor.getForClass(this.avroValueType);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	// Input Format Methods
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void open(FileInputSplit split) throws IOException {
+		super.open(split);
+
+		DatumReader<E> datumReader;
+		if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
+			datumReader = new SpecificDatumReader<E>(avroValueType);
+		} else {
+			datumReader = new ReflectDatumReader<E>(avroValueType);
+		}
+		
+		LOG.info("Opening split " + split);
+		
+		SeekableInput in = new FSDataInputStreamWrapper(stream, (int) split.getLength());
+		
+		dataFileReader = DataFileReader.openReader(in, datumReader);
+		dataFileReader.sync(split.getStart());
+	}
+
+	@Override
+	public boolean reachedEnd() throws IOException {
+		return !dataFileReader.hasNext();
+	}
+
+	@Override
+	public E nextRecord(E reuseValue) throws IOException {
+		if (!dataFileReader.hasNext()) {
+			return null;
+		}
+		
+		if (!reuseAvroValue) {
+			reuseValue = InstantiationUtil.instantiate(avroValueType, Object.class);
+		}
+		
+		reuseValue = dataFileReader.next(reuseValue);
+		return reuseValue;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
new file mode 100644
index 0000000..56c8214
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroOutputFormat.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.io;
+
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+
+public class AvroOutputFormat<E> extends FileOutputFormat<E> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final Class<E> avroValueType;
+
+	private Schema userDefinedSchema = null;
+
+	private transient DataFileWriter<E> dataFileWriter;
+
+
+	public AvroOutputFormat(Path filePath, Class<E> type) {
+		super(filePath);
+		this.avroValueType = type;
+	}
+
+	public AvroOutputFormat(Class<E> type) {
+		this.avroValueType = type;
+	}
+
+	public void setSchema(Schema schema) {
+		this.userDefinedSchema = schema;
+	}
+
+	@Override
+	public void writeRecord(E record) throws IOException {
+		dataFileWriter.append(record);
+	}
+
+	@Override
+	public void open(int taskNumber, int numTasks) throws IOException {
+		super.open(taskNumber, numTasks);
+		DatumWriter<E> datumWriter;
+		Schema schema = null;
+		if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
+			datumWriter = new SpecificDatumWriter<E>(avroValueType);
+			try {
+				schema = ((org.apache.avro.specific.SpecificRecordBase)avroValueType.newInstance()).getSchema();
+			} catch (InstantiationException e) {
+				throw new RuntimeException(e.getMessage());
+			} catch (IllegalAccessException e) {
+				throw new RuntimeException(e.getMessage());
+			}
+		} else {
+			datumWriter = new ReflectDatumWriter<E>(avroValueType);
+			schema = ReflectData.get().getSchema(avroValueType);
+		}
+		dataFileWriter = new DataFileWriter<E>(datumWriter);
+		if (userDefinedSchema == null) {
+			dataFileWriter.create(schema, stream);
+		} else {
+			dataFileWriter.create(userDefinedSchema, stream);
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		dataFileWriter.flush();
+		dataFileWriter.close();
+		super.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
new file mode 100644
index 0000000..ab96895
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.record.io.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.avro.AvroBaseValue;
+import org.apache.flink.api.avro.FSDataInputStreamWrapper;
+import org.apache.flink.api.java.record.io.FileInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.ReflectionUtil;
+
+
+public class AvroInputFormat<E> extends FileInputFormat {
+	
+	private static final long serialVersionUID = 1L;
+
+	private static final Log LOG = LogFactory.getLog(AvroInputFormat.class);
+	
+	
+	private final Class<? extends AvroBaseValue<E>> avroWrapperTypeClass;
+	
+	private final Class<E> avroValueType;
+	
+
+	private transient FileReader<E> dataFileReader;
+	
+	private transient E reuseAvroValue;
+	
+	private transient AvroBaseValue<E> wrapper;
+	
+	
+	public AvroInputFormat(Class<? extends AvroBaseValue<E>> wrapperClass) {
+		this.avroWrapperTypeClass = wrapperClass;
+		this.avroValueType = ReflectionUtil.getTemplateType1(wrapperClass);
+		this.unsplittable = true;
+	}
+	
+	public AvroInputFormat(Class<? extends AvroBaseValue<E>> wrapperClass, Class<E> avroType) {
+		this.avroValueType = avroType;
+		this.avroWrapperTypeClass = wrapperClass;
+		this.unsplittable = true;
+	}
+
+	@Override
+	public void open(FileInputSplit split) throws IOException {
+		super.open(split);
+		
+		this.wrapper = InstantiationUtil.instantiate(avroWrapperTypeClass, AvroBaseValue.class);
+		
+		DatumReader<E> datumReader;
+		if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
+			datumReader = new SpecificDatumReader<E>(avroValueType);
+		} else {
+			datumReader = new ReflectDatumReader<E>(avroValueType);
+		}
+		
+		LOG.info("Opening split " + split);
+		
+		SeekableInput in = new FSDataInputStreamWrapper(stream, (int) split.getLength());
+		
+		dataFileReader = DataFileReader.openReader(in, datumReader);
+		dataFileReader.sync(split.getStart());
+		
+		reuseAvroValue = null;
+	}
+
+	@Override
+	public boolean reachedEnd() throws IOException {
+		return !dataFileReader.hasNext();
+	}
+
+	@Override
+	public Record nextRecord(Record record) throws IOException {
+		if (!dataFileReader.hasNext()) {
+			return null;
+		}
+		
+		reuseAvroValue = dataFileReader.next(reuseAvroValue);
+		wrapper.datum(reuseAvroValue);
+		record.setField(0, wrapper);
+		return record;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
new file mode 100644
index 0000000..1464ca9
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.record.io.avro;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.avro.FSDataInputStreamWrapper;
+import org.apache.flink.api.java.record.io.FileInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.BooleanValue;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.FloatValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.ListValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.MapValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.types.Value;
+
+/**
+ * Input format to read Avro files.
+ * 
+ * The input format currently supports only flat avro schemas. So there is no
+ * support for complex types except for nullable primitve fields, e.g.
+ * ["string", null] (See
+ * http://avro.apache.org/docs/current/spec.html#schema_complex)
+ * 
+ */
+public class AvroRecordInputFormat extends FileInputFormat {
+	private static final long serialVersionUID = 1L;
+
+	private static final Log LOG = LogFactory.getLog(AvroRecordInputFormat.class);
+
+	private FileReader<GenericRecord> dataFileReader;
+	private GenericRecord reuseAvroRecord = null;
+
+	@Override
+	public void open(FileInputSplit split) throws IOException {
+		super.open(split);
+		DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
+		SeekableInput in = new FSDataInputStreamWrapper(stream, (int) split.getLength());
+		LOG.info("Opening split " + split);
+		dataFileReader = DataFileReader.openReader(in, datumReader);
+		dataFileReader.sync(split.getStart());
+	}
+
+	@Override
+	public boolean reachedEnd() throws IOException {
+		return !dataFileReader.hasNext();
+	}
+
+	@Override
+	public Record nextRecord(Record record) throws IOException {
+		if (!dataFileReader.hasNext()) {
+			return null;
+		}
+		if (record == null) {
+			throw new IllegalArgumentException("Empty PactRecord given");
+		}
+		reuseAvroRecord = dataFileReader.next(reuseAvroRecord);
+		final List<Field> fields = reuseAvroRecord.getSchema().getFields();
+		for (Field field : fields) {
+			final Value value = convertAvroToPactValue(field, reuseAvroRecord.get(field.pos()));
+			record.setField(field.pos(), value);
+			record.updateBinaryRepresenation();
+		}
+
+		return record;
+	}
+
+
+	@SuppressWarnings("unchecked")
+	private final Value convertAvroToPactValue(final Field field, final Object avroRecord) {
+		if (avroRecord == null) {
+			return null;
+		}
+		final Type type = checkTypeConstraintsAndGetType(field.schema());
+
+		// check for complex types
+		// (complex type FIXED is not yet supported)
+		switch (type) {
+			case ARRAY:
+				final Type elementType = field.schema().getElementType().getType();
+				final List<?> avroList = (List<?>) avroRecord;
+				return convertAvroArrayToListValue(elementType, avroList);
+			case ENUM:
+				final List<String> symbols = field.schema().getEnumSymbols();
+				final String avroRecordString = avroRecord.toString();
+				if (!symbols.contains(avroRecordString)) {
+					throw new RuntimeException("The given Avro file contains field with a invalid enum symbol");
+				}
+				sString.setValue(avroRecordString);
+				return sString;
+			case MAP:
+				final Type valueType = field.schema().getValueType().getType();
+				final Map<CharSequence, ?> avroMap = (Map<CharSequence, ?>) avroRecord;
+				return convertAvroMapToMapValue(valueType, avroMap);
+	
+			// primitive type
+			default:
+				return convertAvroPrimitiveToValue(type, avroRecord);
+
+		}
+	}
+
+	private final ListValue<?> convertAvroArrayToListValue(Type elementType, List<?> avroList) {
+		switch (elementType) {
+		case STRING:
+			StringListValue sl = new StringListValue();
+			for (Object item : avroList) {
+				sl.add(new StringValue((CharSequence) item));
+			}
+			return sl;
+		case INT:
+			IntListValue il = new IntListValue();
+			for (Object item : avroList) {
+				il.add(new IntValue((Integer) item));
+			}
+			return il;
+		case BOOLEAN:
+			BooleanListValue bl = new BooleanListValue();
+			for (Object item : avroList) {
+				bl.add(new BooleanValue((Boolean) item));
+			}
+			return bl;
+		case DOUBLE:
+			DoubleListValue dl = new DoubleListValue();
+			for (Object item : avroList) {
+				dl.add(new DoubleValue((Double) item));
+			}
+			return dl;
+		case FLOAT:
+			FloatListValue fl = new FloatListValue();
+			for (Object item : avroList) {
+				fl.add(new FloatValue((Float) item));
+			}
+			return fl;
+		case LONG:
+			LongListValue ll = new LongListValue();
+			for (Object item : avroList) {
+				ll.add(new LongValue((Long) item));
+			}
+			return ll;
+		default:
+			throw new RuntimeException("Elements of type " + elementType + " are not supported for Avro arrays.");
+		}
+	}
+
+	private final MapValue<StringValue, ?> convertAvroMapToMapValue(Type mapValueType, Map<CharSequence, ?> avroMap) {
+		switch (mapValueType) {
+		case STRING:
+			StringMapValue sm = new StringMapValue();
+			for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
+				sm.put(new StringValue((CharSequence) entry.getKey()), new StringValue((String) entry.getValue()));
+			}
+			return sm;
+		case INT:
+			IntMapValue im = new IntMapValue();
+			for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
+				im.put(new StringValue((CharSequence) entry.getKey()), new IntValue((Integer) entry.getValue()));
+			}
+			return im;
+		case BOOLEAN:
+			BooleanMapValue bm = new BooleanMapValue();
+			for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
+				bm.put(new StringValue((CharSequence) entry.getKey()), new BooleanValue((Boolean) entry.getValue()));
+			}
+			return bm;
+		case DOUBLE:
+			DoubleMapValue dm = new DoubleMapValue();
+			for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
+				dm.put(new StringValue((CharSequence) entry.getKey()), new DoubleValue((Double) entry.getValue()));
+			}
+			return dm;
+		case FLOAT:
+			FloatMapValue fm = new FloatMapValue();
+			for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
+				fm.put(new StringValue((CharSequence) entry.getKey()), new FloatValue((Float) entry.getValue()));
+			}
+			return fm;
+		case LONG:
+			LongMapValue lm = new LongMapValue();
+			for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
+				lm.put(new StringValue((CharSequence) entry.getKey()), new LongValue((Long) entry.getValue()));
+			}
+			return lm;
+
+		default:
+			throw new RuntimeException("Map values of type " + mapValueType + " are not supported for Avro map.");
+		}
+	}
+
+	private StringValue sString = new StringValue();
+	private IntValue sInt = new IntValue();
+	private BooleanValue sBool = new BooleanValue();
+	private DoubleValue sDouble = new DoubleValue();
+	private FloatValue sFloat = new FloatValue();
+	private LongValue sLong = new LongValue();
+	
+	private final Value convertAvroPrimitiveToValue(Type type, Object avroRecord) {
+		switch (type) {
+		case STRING:
+			sString.setValue((CharSequence) avroRecord);
+			return sString;
+		case INT:
+			sInt.setValue((Integer) avroRecord);
+			return sInt;
+		case BOOLEAN:
+			sBool.setValue((Boolean) avroRecord);
+			return sBool;
+		case DOUBLE:
+			sDouble.setValue((Double) avroRecord);
+			return sDouble;
+		case FLOAT:
+			sFloat.setValue((Float) avroRecord);
+			return sFloat;
+		case LONG:
+			sLong.setValue((Long) avroRecord);
+			return sLong;
+		case NULL:
+			return NullValue.getInstance();
+		default:
+			throw new RuntimeException(
+					"Type "
+							+ type
+							+ " for AvroInputFormat is not implemented. Open an issue on GitHub.");
+		}
+	}
+
+	private final Type checkTypeConstraintsAndGetType(final Schema schema) {
+		final Type type = schema.getType();
+		if (type == Type.RECORD) {
+			throw new RuntimeException("The given Avro file contains complex data types which are not supported right now");
+		}
+
+		if (type == Type.UNION) {
+			List<Schema> types = schema.getTypes();
+			if (types.size() > 2) {
+				throw new RuntimeException("The given Avro file contains a union that has more than two elements");
+			}
+			if (types.size() == 1 && types.get(0).getType() != Type.UNION) {
+				return types.get(0).getType();
+			}
+			if (types.get(0).getType() == Type.UNION || types.get(1).getType() == Type.UNION) {
+				throw new RuntimeException("The given Avro file contains a nested union");
+			}
+			if (types.get(0).getType() == Type.NULL) {
+				return types.get(1).getType();
+			} else {
+				if (types.get(1).getType() != Type.NULL) {
+					throw new RuntimeException("The given Avro file is contains a union with two non-null types.");
+				}
+				return types.get(0).getType();
+			}
+		}
+		return type;
+	}
+
+	/**
+	 * Set minNumSplits to number of files.
+	 */
+	@Override
+	public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException {
+		int numAvroFiles = 0;
+		final Path path = this.filePath;
+		// get all the files that are involved in the splits
+		final FileSystem fs = path.getFileSystem();
+		final FileStatus pathFile = fs.getFileStatus(path);
+
+		if (!acceptFile(pathFile)) {
+			throw new IOException("The given file does not pass the file-filter");
+		}
+		if (pathFile.isDir()) {
+			// input is directory. list all contained files
+			final FileStatus[] dir = fs.listStatus(path);
+			for (int i = 0; i < dir.length; i++) {
+				if (!dir[i].isDir() && acceptFile(dir[i])) {
+					numAvroFiles++;
+				}
+			}
+		} else {
+			numAvroFiles = 1;
+		}
+		return super.createInputSplits(numAvroFiles);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Concrete subclasses of ListValue and MapValue for all possible primitive types
+	// --------------------------------------------------------------------------------------------
+
+	public static class StringListValue extends ListValue<StringValue> {
+		private static final long serialVersionUID = 1L;
+	}
+
+	public static class IntListValue extends ListValue<IntValue> {
+		private static final long serialVersionUID = 1L;
+	}
+
+	public static class BooleanListValue extends ListValue<BooleanValue> {
+		private static final long serialVersionUID = 1L;
+	}
+
+	public static class DoubleListValue extends ListValue<DoubleValue> {
+		private static final long serialVersionUID = 1L;
+	}
+
+	public static class FloatListValue extends ListValue<FloatValue> {
+		private static final long serialVersionUID = 1L;
+	}
+
+	public static class LongListValue extends ListValue<LongValue> {
+		private static final long serialVersionUID = 1L;
+	}
+
+	public static class StringMapValue extends MapValue<StringValue, StringValue> {
+		private static final long serialVersionUID = 1L;
+	}
+
+	public static class IntMapValue extends MapValue<StringValue, IntValue> {
+		private static final long serialVersionUID = 1L;
+	}
+
+	public static class BooleanMapValue extends MapValue<StringValue, BooleanValue> {
+		private static final long serialVersionUID = 1L;
+	}
+
+	public static class DoubleMapValue extends MapValue<StringValue, DoubleValue> {
+		private static final long serialVersionUID = 1L;
+	}
+
+	public static class FloatMapValue extends MapValue<StringValue, FloatValue> {
+		private static final long serialVersionUID = 1L;
+	}
+
+	public static class LongMapValue extends MapValue<StringValue, LongValue> {
+		private static final long serialVersionUID = 1L;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java
new file mode 100644
index 0000000..9cdaef0
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.record.io.avro.example;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.record.functions.MapFunction;
+import org.apache.flink.api.java.record.functions.ReduceFunction;
+import org.apache.flink.api.java.record.io.GenericInputFormat;
+import org.apache.flink.api.java.record.operators.GenericDataSink;
+import org.apache.flink.api.java.record.operators.GenericDataSource;
+import org.apache.flink.api.java.record.operators.MapOperator;
+import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.client.LocalExecutor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.Collector;
+
+
+public class ReflectiveAvroTypeExample {
+	
+	
+	public static void main(String[] args) throws Exception {
+		
+		GenericDataSource<UserGeneratingInputFormat> source = new GenericDataSource<UserGeneratingInputFormat>(UserGeneratingInputFormat.class);
+		
+		MapOperator mapper = MapOperator.builder(new NumberExtractingMapper())
+				.input(source).name("le mapper").build();
+		
+		ReduceOperator reducer = ReduceOperator.builder(new ConcatenatingReducer(), IntValue.class, 1)
+				.input(mapper).name("le reducer").build();
+		
+		GenericDataSink sink = new GenericDataSink(PrintingOutputFormat.class, reducer);
+		
+		Plan p = new Plan(sink);
+		p.setDefaultParallelism(4);
+		
+		LocalExecutor.execute(p);
+	}
+	
+	
+	public static final class NumberExtractingMapper extends MapFunction implements Serializable {
+		private static final long serialVersionUID = 1L;
+		
+		@Override
+		public void map(Record record, Collector<Record> out) throws Exception {
+			User u = record.getField(0, SUser.class).datum();
+			record.setField(1, new IntValue(u.getFavoriteNumber()));
+			out.collect(record);
+		}
+	}
+	
+	
+	public static final class ConcatenatingReducer extends ReduceFunction implements Serializable {
+		private static final long serialVersionUID = 1L;
+		
+		private final Record result = new Record(2);
+
+		@Override
+		public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
+			Record r = records.next();
+			
+			int num = r.getField(1, IntValue.class).getValue();
+			String names = r.getField(0, SUser.class).datum().getFavoriteColor().toString();
+			
+			while (records.hasNext()) {
+				r = records.next();
+				names += " - " + r.getField(0, SUser.class).datum().getFavoriteColor().toString();
+			}
+			
+			result.setField(0, new IntValue(num));
+			result.setField(1,  new StringValue(names));
+			out.collect(result);
+		}
+
+	}
+	
+	
+	public static final class UserGeneratingInputFormat extends GenericInputFormat {
+
+		private static final long serialVersionUID = 1L;
+		
+		private static final int NUM = 100;
+		
+		private final Random rnd = new Random(32498562304986L);
+		
+		private static final String[] NAMES = { "Peter", "Bob", "Liddy", "Alexander", "Stan" };
+		
+		private static final String[] COLORS = { "mauve", "crimson", "copper", "sky", "grass" };
+		
+		private int count;
+		
+
+		@Override
+		public boolean reachedEnd() throws IOException {
+			return count >= NUM;
+		}
+
+		@Override
+		public Record nextRecord(Record record) throws IOException {
+			count++;
+			
+			User u = new User();
+			u.setName(NAMES[rnd.nextInt(NAMES.length)]);
+			u.setFavoriteColor(COLORS[rnd.nextInt(COLORS.length)]);
+			u.setFavoriteNumber(rnd.nextInt(87));
+			
+			SUser su = new SUser();
+			su.datum(u);
+			
+			record.setField(0, su);
+			return record;
+		}
+	}
+	
+	public static final class PrintingOutputFormat implements OutputFormat<Record> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void configure(Configuration parameters) {}
+
+		@Override
+		public void open(int taskNumber, int numTasks) throws IOException {}
+
+		@Override
+		public void writeRecord(Record record) throws IOException {
+			int color = record.getField(0, IntValue.class).getValue();
+			String names = record.getField(1, StringValue.class).getValue();
+			
+			System.out.println(color + ": " + names);
+		}
+		
+		@Override
+		public void close() throws IOException {}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java
new file mode 100644
index 0000000..2fdfc05
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.record.io.avro.example;
+
+import org.apache.flink.api.avro.AvroBaseValue;
+
+public class SUser extends AvroBaseValue<User> {
+	private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java
new file mode 100644
index 0000000..7f48775
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Autogenerated by Avro
+ * 
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.flink.api.java.record.io.avro.example;  
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.avro.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
+  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+  @Deprecated public java.lang.CharSequence name;
+  @Deprecated public java.lang.Integer favorite_number;
+  @Deprecated public java.lang.CharSequence favorite_color;
+
+  /**
+   * Default constructor.  Note that this does not initialize fields
+   * to their default values from the schema.  If that is desired then
+   * one should use {@link \#newBuilder()}. 
+   */
+  public User() {}
+
+  /**
+   * All-args constructor.
+   */
+  public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color) {
+    this.name = name;
+    this.favorite_number = favorite_number;
+    this.favorite_color = favorite_color;
+  }
+
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call. 
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return name;
+    case 1: return favorite_number;
+    case 2: return favorite_color;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call. 
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: name = (java.lang.CharSequence)value$; break;
+    case 1: favorite_number = (java.lang.Integer)value$; break;
+    case 2: favorite_color = (java.lang.CharSequence)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+
+  /**
+   * Gets the value of the 'name' field.
+   */
+  public java.lang.CharSequence getName() {
+    return name;
+  }
+
+  /**
+   * Sets the value of the 'name' field.
+   * @param value the value to set.
+   */
+  public void setName(java.lang.CharSequence value) {
+    this.name = value;
+  }
+
+  /**
+   * Gets the value of the 'favorite_number' field.
+   */
+  public java.lang.Integer getFavoriteNumber() {
+    return favorite_number;
+  }
+
+  /**
+   * Sets the value of the 'favorite_number' field.
+   * @param value the value to set.
+   */
+  public void setFavoriteNumber(java.lang.Integer value) {
+    this.favorite_number = value;
+  }
+
+  /**
+   * Gets the value of the 'favorite_color' field.
+   */
+  public java.lang.CharSequence getFavoriteColor() {
+    return favorite_color;
+  }
+
+  /**
+   * Sets the value of the 'favorite_color' field.
+   * @param value the value to set.
+   */
+  public void setFavoriteColor(java.lang.CharSequence value) {
+    this.favorite_color = value;
+  }
+
+  /** Creates a new User RecordBuilder */
+  public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder() {
+    return new org.apache.flink.api.java.record.io.avro.example.User.Builder();
+  }
+  
+  /** Creates a new User RecordBuilder by copying an existing Builder */
+  public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.example.User.Builder other) {
+    return new org.apache.flink.api.java.record.io.avro.example.User.Builder(other);
+  }
+  
+  /** Creates a new User RecordBuilder by copying an existing User instance */
+  public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.example.User other) {
+    return new org.apache.flink.api.java.record.io.avro.example.User.Builder(other);
+  }
+  
+  /**
+   * RecordBuilder for User instances.
+   */
+  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User>
+    implements org.apache.avro.data.RecordBuilder<User> {
+
+    private java.lang.CharSequence name;
+    private java.lang.Integer favorite_number;
+    private java.lang.CharSequence favorite_color;
+
+    /** Creates a new Builder */
+    private Builder() {
+      super(org.apache.flink.api.java.record.io.avro.example.User.SCHEMA$);
+    }
+    
+    /** Creates a Builder by copying an existing Builder */
+    private Builder(org.apache.flink.api.java.record.io.avro.example.User.Builder other) {
+      super(other);
+      if (isValidValue(fields()[0], other.name)) {
+        this.name = data().deepCopy(fields()[0].schema(), other.name);
+        fieldSetFlags()[0] = true;
+      }
+      if (isValidValue(fields()[1], other.favorite_number)) {
+        this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
+        fieldSetFlags()[1] = true;
+      }
+      if (isValidValue(fields()[2], other.favorite_color)) {
+        this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
+        fieldSetFlags()[2] = true;
+      }
+    }
+    
+    /** Creates a Builder by copying an existing User instance */
+    private Builder(org.apache.flink.api.java.record.io.avro.example.User other) {
+            super(org.apache.flink.api.java.record.io.avro.example.User.SCHEMA$);
+      if (isValidValue(fields()[0], other.name)) {
+        this.name = data().deepCopy(fields()[0].schema(), other.name);
+        fieldSetFlags()[0] = true;
+      }
+      if (isValidValue(fields()[1], other.favorite_number)) {
+        this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
+        fieldSetFlags()[1] = true;
+      }
+      if (isValidValue(fields()[2], other.favorite_color)) {
+        this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
+        fieldSetFlags()[2] = true;
+      }
+    }
+
+    /** Gets the value of the 'name' field */
+    public java.lang.CharSequence getName() {
+      return name;
+    }
+    
+    /** Sets the value of the 'name' field */
+    public org.apache.flink.api.java.record.io.avro.example.User.Builder setName(java.lang.CharSequence value) {
+      validate(fields()[0], value);
+      this.name = value;
+      fieldSetFlags()[0] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'name' field has been set */
+    public boolean hasName() {
+      return fieldSetFlags()[0];
+    }
+    
+    /** Clears the value of the 'name' field */
+    public org.apache.flink.api.java.record.io.avro.example.User.Builder clearName() {
+      name = null;
+      fieldSetFlags()[0] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'favorite_number' field */
+    public java.lang.Integer getFavoriteNumber() {
+      return favorite_number;
+    }
+    
+    /** Sets the value of the 'favorite_number' field */
+    public org.apache.flink.api.java.record.io.avro.example.User.Builder setFavoriteNumber(java.lang.Integer value) {
+      validate(fields()[1], value);
+      this.favorite_number = value;
+      fieldSetFlags()[1] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'favorite_number' field has been set */
+    public boolean hasFavoriteNumber() {
+      return fieldSetFlags()[1];
+    }
+    
+    /** Clears the value of the 'favorite_number' field */
+    public org.apache.flink.api.java.record.io.avro.example.User.Builder clearFavoriteNumber() {
+      favorite_number = null;
+      fieldSetFlags()[1] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'favorite_color' field */
+    public java.lang.CharSequence getFavoriteColor() {
+      return favorite_color;
+    }
+    
+    /** Sets the value of the 'favorite_color' field */
+    public org.apache.flink.api.java.record.io.avro.example.User.Builder setFavoriteColor(java.lang.CharSequence value) {
+      validate(fields()[2], value);
+      this.favorite_color = value;
+      fieldSetFlags()[2] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'favorite_color' field has been set */
+    public boolean hasFavoriteColor() {
+      return fieldSetFlags()[2];
+    }
+    
+    /** Clears the value of the 'favorite_color' field */
+    public org.apache.flink.api.java.record.io.avro.example.User.Builder clearFavoriteColor() {
+      favorite_color = null;
+      fieldSetFlags()[2] = false;
+      return this;
+    }
+
+    @Override
+    public User build() {
+      try {
+        User record = new User();
+        record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
+        record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]);
+        record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]);
+        return record;
+      } catch (Exception e) {
+        throw new org.apache.avro.AvroRuntimeException(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/assembly/test-assembly.xml b/flink-addons/flink-avro/src/test/assembly/test-assembly.xml
new file mode 100644
index 0000000..8316581
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/assembly/test-assembly.xml
@@ -0,0 +1,31 @@
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<assembly>
+	<id>test-jar</id>
+	<formats>
+		<format>jar</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>${project.build.testOutputDirectory}</directory>
+			<outputDirectory>/</outputDirectory>
+			<!--modify/add include to match your package(s) -->
+			<includes>
+				<include>org/apache/flink/api/avro/testjar/**</include>
+			</includes>
+		</fileSet>
+	</fileSets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
new file mode 100644
index 0000000..4a6e7f1
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+
+import org.apache.flink.client.minicluster.NepheleMiniCluster;
+import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.LogUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class AvroExternalJarProgramITCase {
+
+	private static final int TEST_JM_PORT = 43191;
+	
+	private static final String JAR_FILE = "target/maven-test-jar.jar";
+	
+	private static final String TEST_DATA_FILE = "/testdata.avro";
+
+	static {
+		LogUtils.initializeDefaultTestConsoleLogger();
+	}
+	
+	@Test
+	public void testExternalProgram() {
+		
+		NepheleMiniCluster testMiniCluster = null;
+		
+		try {
+			testMiniCluster = new NepheleMiniCluster();
+			testMiniCluster.setJobManagerRpcPort(TEST_JM_PORT);
+			testMiniCluster.setTaskManagerNumSlots(4);
+			testMiniCluster.start();
+			
+			String jarFile = JAR_FILE;
+			String testData = getClass().getResource(TEST_DATA_FILE).toString();
+			
+			PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });
+						
+			Client c = new Client(new InetSocketAddress("localhost", TEST_JM_PORT), new Configuration());
+			c.run(program, 4, true);
+		}
+		catch (Throwable t) {
+			System.err.println(t.getMessage());
+			t.printStackTrace();
+			Assert.fail("Error during the packaged program execution: " + t.getMessage());
+		}
+		finally {
+			if (testMiniCluster != null) {
+				try {
+					testMiniCluster.stop();
+				} catch (Throwable t) {}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
new file mode 100644
index 0000000..637a5e9
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import org.junit.Assert;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.io.AvroOutputFormat;
+import org.apache.flink.api.java.record.io.avro.example.User;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+@SuppressWarnings("serial")
+public class AvroOutputFormatTest extends JavaProgramTestBase {
+
+	public static String outputPath1;
+
+	public static String outputPath2;
+
+	public static String inputPath;
+
+	public static String userData = "alice|1|blue\n" +
+		"bob|2|red\n" +
+		"john|3|yellow\n" +
+		"walt|4|black\n";
+
+	@Override
+	protected void preSubmit() throws Exception {
+		inputPath = createTempFile("user", userData);
+		outputPath1 = getTempDirPath("avro_output1");
+		outputPath2 = getTempDirPath("avro_output2");
+	}
+
+
+	@Override
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<String, Integer, String>> input = env.readCsvFile(inputPath)
+			.fieldDelimiter('|')
+			.types(String.class, Integer.class, String.class);
+
+		//output the data with AvroOutputFormat for specific user type
+		DataSet<User> specificUser = input.map(new ConvertToUser());
+		specificUser.write(new AvroOutputFormat<User>(User.class), outputPath1);
+
+		//output the data with AvroOutputFormat for reflect user type
+		DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new ConvertToReflective());
+		reflectiveUser.write(new AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2);
+
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		//compare result for specific user type
+		File [] output1;
+		File file1 = asFile(outputPath1);
+		if (file1.isDirectory()) {
+			output1 = file1.listFiles();
+		} else {
+			output1 = new File[] {file1};
+		}
+		List<String> result1 = new ArrayList<String>();
+		DatumReader<User> userDatumReader1 = new SpecificDatumReader<User>(User.class);
+		for (File avroOutput : output1) {
+			DataFileReader<User> dataFileReader1 = new DataFileReader<User>(avroOutput, userDatumReader1);
+			while (dataFileReader1.hasNext()) {
+				User user = dataFileReader1.next();
+				result1.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
+			}
+		}
+		for (String expectedResult : userData.split("\n")) {
+			Assert.assertTrue("expected user " + expectedResult + " not found.", result1.contains(expectedResult));
+		}
+
+		//compare result for reflect user type
+		File [] output2;
+		File file2 = asFile(outputPath2);
+		if (file2.isDirectory()) {
+			output2 = file2.listFiles();
+		} else {
+			output2 = new File[] {file2};
+		}
+		List<String> result2 = new ArrayList<String>();
+		DatumReader<ReflectiveUser> userDatumReader2 = new ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class);
+		for (File avroOutput : output2) {
+			DataFileReader<ReflectiveUser> dataFileReader2 = new DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2);
+			while (dataFileReader2.hasNext()) {
+				ReflectiveUser user = dataFileReader2.next();
+				result2.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
+			}
+		}
+		for (String expectedResult : userData.split("\n")) {
+			Assert.assertTrue("expected user " + expectedResult + " not found.", result2.contains(expectedResult));
+		}
+
+
+	}
+
+
+	public final static class ConvertToUser extends MapFunction<Tuple3<String, Integer, String>, User> {
+
+		@Override
+		public User map(Tuple3<String, Integer, String> value) throws Exception {
+			return new User(value.f0, value.f1, value.f2);
+		}
+	}
+
+	public final static class ConvertToReflective extends MapFunction<User, ReflectiveUser> {
+
+		@Override
+		public ReflectiveUser map(User value) throws Exception {
+			return new ReflectiveUser(value.getName().toString(), value.getFavoriteNumber(), value.getFavoriteColor().toString());
+		}
+	}
+
+	
+	public static class ReflectiveUser {
+		private String name;
+		private int favoriteNumber;
+		private String favoriteColor;
+
+		public ReflectiveUser() {}
+
+		public ReflectiveUser(String name, int favoriteNumber, String favoriteColor) {
+			this.name = name;
+			this.favoriteNumber = favoriteNumber;
+			this.favoriteColor = favoriteColor;
+		}
+		
+		public String getName() {
+			return this.name;
+		}
+		public String getFavoriteColor() {
+			return this.favoriteColor;
+		}
+		public int getFavoriteNumber() {
+			return this.favoriteNumber;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
new file mode 100644
index 0000000..ea9edff
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.avro.reflect.Nullable;
+import org.apache.flink.api.avro.AvroBaseValue;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.record.functions.CoGroupFunction;
+import org.apache.flink.api.java.record.io.GenericInputFormat;
+import org.apache.flink.api.java.record.operators.CoGroupOperator;
+import org.apache.flink.api.java.record.operators.GenericDataSink;
+import org.apache.flink.api.java.record.operators.GenericDataSource;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.RecordAPITestBase;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.Collector;
+
+public class AvroWithEmptyArrayITCase extends RecordAPITestBase {
+
+	@Override
+	protected Plan getTestJob() {
+		GenericDataSource<RandomInputFormat> bookSource = new GenericDataSource<RandomInputFormat>(
+			new RandomInputFormat(true));
+		GenericDataSource<RandomInputFormat> authorSource = new GenericDataSource<RandomInputFormat>(
+			new RandomInputFormat(false));
+
+		CoGroupOperator coGroupOperator = CoGroupOperator.builder(MyCoGrouper.class, LongValue.class, 0, 0)
+			.input1(bookSource).input2(authorSource).name("CoGrouper Test").build();
+
+		GenericDataSink sink = new GenericDataSink(PrintingOutputFormat.class, coGroupOperator);
+
+		Plan plan = new Plan(sink, "CoGroper Test Plan");
+		plan.setDefaultParallelism(1);
+		return plan;
+	}
+
+	public static class SBookAvroValue extends AvroBaseValue<Book> {
+		private static final long serialVersionUID = 1L;
+
+		public SBookAvroValue() {}
+
+		public SBookAvroValue(Book datum) {
+			super(datum);
+		}
+	}
+
+	public static class Book {
+
+		long bookId;
+		@Nullable
+		String title;
+		long authorId;
+
+		public Book() {
+		}
+
+		public Book(long bookId, String title, long authorId) {
+			this.bookId = bookId;
+			this.title = title;
+			this.authorId = authorId;
+		}
+	}
+
+	public static class SBookAuthorValue extends AvroBaseValue<BookAuthor> {
+		private static final long serialVersionUID = 1L;
+
+		public SBookAuthorValue() {}
+
+		public SBookAuthorValue(BookAuthor datum) {
+			super(datum);
+		}
+	}
+
+	public static class BookAuthor {
+
+		enum BookType {
+			book,
+			article,
+			journal
+		}
+
+		long authorId;
+
+		@Nullable
+		List<String> bookTitles;
+
+		@Nullable
+		List<Book> books;
+
+		String authorName;
+
+		BookType bookType;
+
+		public BookAuthor() {}
+
+		public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
+			this.authorId = authorId;
+			this.bookTitles = bookTitles;
+			this.authorName = authorName;
+		}
+	}
+
+	public static class RandomInputFormat extends GenericInputFormat {
+		private static final long serialVersionUID = 1L;
+
+		private final boolean isBook;
+
+		private boolean touched = false;
+
+		public RandomInputFormat(boolean isBook) {
+			this.isBook = isBook;
+		}
+
+		@Override
+		public boolean reachedEnd() throws IOException {
+			return touched;
+		}
+
+		@Override
+		public Record nextRecord(Record record) throws IOException {
+			touched = true;
+			record.setField(0, new LongValue(26382648));
+
+			if (isBook) {
+				Book b = new Book(123, "This is a test book", 26382648);
+				record.setField(1, new SBookAvroValue(b));
+			} else {
+				List<String> titles = new ArrayList<String>();
+				// titles.add("Title1");
+				// titles.add("Title2");
+				// titles.add("Title3");
+
+				List<Book> books = new ArrayList<Book>();
+				books.add(new Book(123, "This is a test book", 1));
+				books.add(new Book(24234234, "This is a test book", 1));
+				books.add(new Book(1234324, "This is a test book", 3));
+
+				BookAuthor a = new BookAuthor(1, titles, "Test Author");
+				a.books = books;
+				a.bookType = BookAuthor.BookType.journal;
+				record.setField(1, new SBookAuthorValue(a));
+			}
+
+			return record;
+		}
+	}
+
+	public static final class PrintingOutputFormat implements OutputFormat<Record> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void configure(Configuration parameters) {}
+
+		@Override
+		public void open(int taskNumber, int numTasks) {}
+
+		@Override
+		public void writeRecord(Record record) throws IOException {
+			long key = record.getField(0, LongValue.class).getValue();
+			String val = record.getField(1, StringValue.class).getValue();
+			System.out.println(key + " : " + val);
+		}
+
+		@Override
+		public void close() {}
+	}
+
+	public static class MyCoGrouper extends CoGroupFunction {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out)
+				throws Exception {
+
+			Record r1 = null;
+			if (records1.hasNext()) {
+				r1 = records1.next();
+			}
+			Record r2 = null;
+			if (records2.hasNext()) {
+				r2 = records2.next();
+			}
+
+			if (r1 != null) {
+				r1.getField(1, SBookAvroValue.class).datum();
+			}
+
+			if (r2 != null) {
+				r2.getField(1, SBookAuthorValue.class).datum();
+			}
+		}
+	}
+}