You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/30 19:59:01 UTC

[2/9] git commit: [FLINK-1117] Clean up flink-avro project: remove deprecated AvroRecord format, migrate tests to new java api.

[FLINK-1117] Clean up flink-avro project: remove deprecated AvroRecord format, migrate tests to new java api.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/626d6b78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/626d6b78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/626d6b78

Branch: refs/heads/master
Commit: 626d6b785db649e06951e2f336f5ca411b30dce5
Parents: 38e4755
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 30 14:15:22 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 30 18:39:58 2014 +0200

----------------------------------------------------------------------
 flink-addons/flink-avro/pom.xml                 |   7 +-
 .../apache/flink/api/avro/AvroBaseValue.java    | 153 ----
 .../api/io/avro/example/AvroTypeExample.java    | 111 +++
 .../apache/flink/api/io/avro/example/User.java  | 269 +++++++
 .../java/record/io/avro/AvroInputFormat.java    | 111 ---
 .../record/io/avro/AvroRecordInputFormat.java   | 374 ---------
 .../avro/example/ReflectiveAvroTypeExample.java | 161 ----
 .../api/java/record/io/avro/example/SUser.java  |  25 -
 .../api/java/record/io/avro/example/User.java   | 269 -------
 .../flink/api/avro/AvroOutputFormatTest.java    |   2 +-
 .../api/avro/AvroWithEmptyArrayITCase.java      | 217 ------
 .../flink/api/avro/EncoderDecoderTest.java      |   4 +-
 .../avro/testjar/AvroExternalJarProgram.java    |  13 -
 .../api/io/avro/AvroRecordInputFormatTest.java  | 167 ++++
 .../flink/api/io/avro/generated/Colors.java     |  32 +
 .../flink/api/io/avro/generated/User.java       | 755 +++++++++++++++++++
 .../io/avro/AvroRecordInputFormatTest.java      | 167 ----
 .../java/record/io/avro/generated/Colors.java   |  32 -
 .../api/java/record/io/avro/generated/User.java | 755 -------------------
 .../runtime/AvroSerializerEmptyArrayTest.java   | 189 +++++
 tools/maven/suppressions.xml                    |   2 +-
 21 files changed, 1531 insertions(+), 2284 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/pom.xml b/flink-addons/flink-avro/pom.xml
index 68f722c..ff4decc 100644
--- a/flink-addons/flink-avro/pom.xml
+++ b/flink-addons/flink-avro/pom.xml
@@ -54,10 +54,11 @@ under the License.
 			<!-- version is derived from base module -->
 		</dependency>
 		
+		<!--  guava needs to be in "provided" scope, so to not re-export the dependency -->
 		<dependency>
-			<groupId>org.apache.avro</groupId>
-			<artifactId>avro-mapred</artifactId>
-			<!-- version is derived from base module -->
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<scope>provided</scope>
 		</dependency>
 		
 		<dependency>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java
deleted file mode 100644
index 0d64910..0000000
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/avro/AvroBaseValue.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.mapred.AvroValue;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Key;
-import org.apache.flink.util.ReflectionUtil;
-
-
-public abstract class AvroBaseValue<T> extends AvroValue<T> implements Key<AvroBaseValue<T>> {
-	
-	private static final long serialVersionUID = 1L;
-
-
-	public AvroBaseValue() {}
-	
-	public AvroBaseValue(T datum) {
-		super(datum);
-	}
-
-	
-	// --------------------------------------------------------------------------------------------
-	//  Serialization / Deserialization
-	// --------------------------------------------------------------------------------------------
-	
-	private ReflectDatumWriter<T> writer;
-	private ReflectDatumReader<T> reader;
-	
-	private DataOutputEncoder encoder;
-	private DataInputDecoder decoder;
-	
-	
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		// the null flag
-		if (datum() == null) {
-			out.writeBoolean(false);
-		} else {
-			out.writeBoolean(true);
-			
-			DataOutputEncoder encoder = getEncoder();
-			encoder.setOut(out);
-			getWriter().write(datum(), encoder);
-		}
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		// the null flag
-		if (in.readBoolean()) {
-			
-			DataInputDecoder decoder = getDecoder();
-			decoder.setIn(in);
-			datum(getReader().read(datum(), decoder));
-		}
-	}
-	
-	private ReflectDatumWriter<T> getWriter() {
-		if (this.writer == null) {
-			@SuppressWarnings("unchecked")
-			Class<T> clazz = (Class<T>) datum().getClass();
-			this.writer = new ReflectDatumWriter<T>(clazz);
-		}
-		return this.writer;
-	}
-	
-	private ReflectDatumReader<T> getReader() {
-		if (this.reader == null) {
-			Class<T> datumClass = ReflectionUtil.getTemplateType1(getClass());
-			this.reader = new ReflectDatumReader<T>(datumClass);
-		}
-		return this.reader;
-	}
-	
-	private DataOutputEncoder getEncoder() {
-		if (this.encoder == null) {
-			this.encoder = new DataOutputEncoder();
-		}
-		return this.encoder;
-	}
-	
-	private DataInputDecoder getDecoder() {
-		if (this.decoder == null) {
-			this.decoder = new DataInputDecoder();
-		}
-		return this.decoder;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Hashing / Equality
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return datum() == null ? 0 : datum().hashCode();
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		if (obj.getClass() == this.getClass()) {
-			Object otherDatum = ((AvroBaseValue<?>) obj).datum();
-			Object thisDatum = datum();
-			
-			if (thisDatum == null) {
-				return otherDatum == null;
-			} else {
-				return thisDatum.equals(otherDatum);
-			}
-		} else {
-			return false;
-		}
-	}
-	
-	@Override
-	public String toString() {
-		return "AvroBaseValue (" + datum() + ")";
-	}
-	
-	@SuppressWarnings("unchecked")
-	@Override
-	public int compareTo(AvroBaseValue<T> o) {
-		Object otherDatum = o.datum();
-		Object thisDatum = datum();
-		
-		if (thisDatum == null) {
-			return otherDatum == null ? 0 : -1;
-		} else {
-			return otherDatum == null ? 1: ((Comparable<Object>) thisDatum).compareTo(otherDatum);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
new file mode 100644
index 0000000..6affeec
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.io.avro.example;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+@SuppressWarnings("serial")
+public class AvroTypeExample {
+	
+	
+	public static void main(String[] args) throws Exception {
+		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		DataSet<User> users = env.createInput(new UserGeneratingInputFormat());
+		
+		users
+			.map(new NumberExtractingMapper())
+			.groupBy(1)
+			.reduceGroup(new ConcatenatingReducer())
+			.print();
+		
+		env.execute();
+	}
+	
+	
+	
+	public static final class NumberExtractingMapper implements MapFunction<User, Tuple2<User, Integer>> {
+		
+		@Override
+		public Tuple2<User, Integer> map(User user) {
+			return new Tuple2<User, Integer>(user, user.getFavoriteNumber());
+		}
+	}
+	
+	
+	public static final class ConcatenatingReducer implements GroupReduceFunction<Tuple2<User, Integer>, Tuple2<Integer, String>> {
+
+		@Override
+		public void reduce(Iterable<Tuple2<User, Integer>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
+			int number = 0;
+			StringBuilder colors = new StringBuilder();
+			
+			for (Tuple2<User, Integer> u : values) {
+				number = u.f1;
+				colors.append(u.f0.getFavoriteColor()).append(" - ");
+			}
+			
+			colors.setLength(colors.length() - 3);
+			out.collect(new Tuple2<Integer, String>(number, colors.toString()));
+		}
+	}
+	
+	
+	public static final class UserGeneratingInputFormat extends GenericInputFormat<User> {
+
+		private static final long serialVersionUID = 1L;
+		
+		private static final int NUM = 100;
+		
+		private final Random rnd = new Random(32498562304986L);
+		
+		private static final String[] NAMES = { "Peter", "Bob", "Liddy", "Alexander", "Stan" };
+		
+		private static final String[] COLORS = { "mauve", "crimson", "copper", "sky", "grass" };
+		
+		private int count;
+		
+
+		@Override
+		public boolean reachedEnd() throws IOException {
+			return count >= NUM;
+		}
+
+		@Override
+		public User nextRecord(User reuse) throws IOException {
+			count++;
+			
+			User u = new User();
+			u.setName(NAMES[rnd.nextInt(NAMES.length)]);
+			u.setFavoriteColor(COLORS[rnd.nextInt(COLORS.length)]);
+			u.setFavoriteNumber(rnd.nextInt(87));
+			return u;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/User.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/User.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/User.java
new file mode 100644
index 0000000..3394d60
--- /dev/null
+++ b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/io/avro/example/User.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Autogenerated by Avro
+ * 
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.flink.api.io.avro.example;  
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.avro.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
+  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+  @Deprecated public java.lang.CharSequence name;
+  @Deprecated public java.lang.Integer favorite_number;
+  @Deprecated public java.lang.CharSequence favorite_color;
+
+  /**
+   * Default constructor.  Note that this does not initialize fields
+   * to their default values from the schema.  If that is desired then
+   * one should use {@link \#newBuilder()}. 
+   */
+  public User() {}
+
+  /**
+   * All-args constructor.
+   */
+  public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color) {
+    this.name = name;
+    this.favorite_number = favorite_number;
+    this.favorite_color = favorite_color;
+  }
+
+  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
+  // Used by DatumWriter.  Applications should not call. 
+  public java.lang.Object get(int field$) {
+    switch (field$) {
+    case 0: return name;
+    case 1: return favorite_number;
+    case 2: return favorite_color;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+  // Used by DatumReader.  Applications should not call. 
+  @SuppressWarnings(value="unchecked")
+  public void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+    case 0: name = (java.lang.CharSequence)value$; break;
+    case 1: favorite_number = (java.lang.Integer)value$; break;
+    case 2: favorite_color = (java.lang.CharSequence)value$; break;
+    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+
+  /**
+   * Gets the value of the 'name' field.
+   */
+  public java.lang.CharSequence getName() {
+    return name;
+  }
+
+  /**
+   * Sets the value of the 'name' field.
+   * @param value the value to set.
+   */
+  public void setName(java.lang.CharSequence value) {
+    this.name = value;
+  }
+
+  /**
+   * Gets the value of the 'favorite_number' field.
+   */
+  public java.lang.Integer getFavoriteNumber() {
+    return favorite_number;
+  }
+
+  /**
+   * Sets the value of the 'favorite_number' field.
+   * @param value the value to set.
+   */
+  public void setFavoriteNumber(java.lang.Integer value) {
+    this.favorite_number = value;
+  }
+
+  /**
+   * Gets the value of the 'favorite_color' field.
+   */
+  public java.lang.CharSequence getFavoriteColor() {
+    return favorite_color;
+  }
+
+  /**
+   * Sets the value of the 'favorite_color' field.
+   * @param value the value to set.
+   */
+  public void setFavoriteColor(java.lang.CharSequence value) {
+    this.favorite_color = value;
+  }
+
+  /** Creates a new User RecordBuilder */
+  public static org.apache.flink.api.io.avro.example.User.Builder newBuilder() {
+    return new org.apache.flink.api.io.avro.example.User.Builder();
+  }
+  
+  /** Creates a new User RecordBuilder by copying an existing Builder */
+  public static org.apache.flink.api.io.avro.example.User.Builder newBuilder(org.apache.flink.api.io.avro.example.User.Builder other) {
+    return new org.apache.flink.api.io.avro.example.User.Builder(other);
+  }
+  
+  /** Creates a new User RecordBuilder by copying an existing User instance */
+  public static org.apache.flink.api.io.avro.example.User.Builder newBuilder(org.apache.flink.api.io.avro.example.User other) {
+    return new org.apache.flink.api.io.avro.example.User.Builder(other);
+  }
+  
+  /**
+   * RecordBuilder for User instances.
+   */
+  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User>
+    implements org.apache.avro.data.RecordBuilder<User> {
+
+    private java.lang.CharSequence name;
+    private java.lang.Integer favorite_number;
+    private java.lang.CharSequence favorite_color;
+
+    /** Creates a new Builder */
+    private Builder() {
+      super(org.apache.flink.api.io.avro.example.User.SCHEMA$);
+    }
+    
+    /** Creates a Builder by copying an existing Builder */
+    private Builder(org.apache.flink.api.io.avro.example.User.Builder other) {
+      super(other);
+      if (isValidValue(fields()[0], other.name)) {
+        this.name = data().deepCopy(fields()[0].schema(), other.name);
+        fieldSetFlags()[0] = true;
+      }
+      if (isValidValue(fields()[1], other.favorite_number)) {
+        this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
+        fieldSetFlags()[1] = true;
+      }
+      if (isValidValue(fields()[2], other.favorite_color)) {
+        this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
+        fieldSetFlags()[2] = true;
+      }
+    }
+    
+    /** Creates a Builder by copying an existing User instance */
+    private Builder(org.apache.flink.api.io.avro.example.User other) {
+            super(org.apache.flink.api.io.avro.example.User.SCHEMA$);
+      if (isValidValue(fields()[0], other.name)) {
+        this.name = data().deepCopy(fields()[0].schema(), other.name);
+        fieldSetFlags()[0] = true;
+      }
+      if (isValidValue(fields()[1], other.favorite_number)) {
+        this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
+        fieldSetFlags()[1] = true;
+      }
+      if (isValidValue(fields()[2], other.favorite_color)) {
+        this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
+        fieldSetFlags()[2] = true;
+      }
+    }
+
+    /** Gets the value of the 'name' field */
+    public java.lang.CharSequence getName() {
+      return name;
+    }
+    
+    /** Sets the value of the 'name' field */
+    public org.apache.flink.api.io.avro.example.User.Builder setName(java.lang.CharSequence value) {
+      validate(fields()[0], value);
+      this.name = value;
+      fieldSetFlags()[0] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'name' field has been set */
+    public boolean hasName() {
+      return fieldSetFlags()[0];
+    }
+    
+    /** Clears the value of the 'name' field */
+    public org.apache.flink.api.io.avro.example.User.Builder clearName() {
+      name = null;
+      fieldSetFlags()[0] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'favorite_number' field */
+    public java.lang.Integer getFavoriteNumber() {
+      return favorite_number;
+    }
+    
+    /** Sets the value of the 'favorite_number' field */
+    public org.apache.flink.api.io.avro.example.User.Builder setFavoriteNumber(java.lang.Integer value) {
+      validate(fields()[1], value);
+      this.favorite_number = value;
+      fieldSetFlags()[1] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'favorite_number' field has been set */
+    public boolean hasFavoriteNumber() {
+      return fieldSetFlags()[1];
+    }
+    
+    /** Clears the value of the 'favorite_number' field */
+    public org.apache.flink.api.io.avro.example.User.Builder clearFavoriteNumber() {
+      favorite_number = null;
+      fieldSetFlags()[1] = false;
+      return this;
+    }
+
+    /** Gets the value of the 'favorite_color' field */
+    public java.lang.CharSequence getFavoriteColor() {
+      return favorite_color;
+    }
+    
+    /** Sets the value of the 'favorite_color' field */
+    public org.apache.flink.api.io.avro.example.User.Builder setFavoriteColor(java.lang.CharSequence value) {
+      validate(fields()[2], value);
+      this.favorite_color = value;
+      fieldSetFlags()[2] = true;
+      return this; 
+    }
+    
+    /** Checks whether the 'favorite_color' field has been set */
+    public boolean hasFavoriteColor() {
+      return fieldSetFlags()[2];
+    }
+    
+    /** Clears the value of the 'favorite_color' field */
+    public org.apache.flink.api.io.avro.example.User.Builder clearFavoriteColor() {
+      favorite_color = null;
+      fieldSetFlags()[2] = false;
+      return this;
+    }
+
+    @Override
+    public User build() {
+      try {
+        User record = new User();
+        record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
+        record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]);
+        record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]);
+        return record;
+      } catch (Exception e) {
+        throw new org.apache.avro.AvroRuntimeException(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
deleted file mode 100644
index a898f8d..0000000
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroInputFormat.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.avro.AvroBaseValue;
-import org.apache.flink.api.avro.FSDataInputStreamWrapper;
-import org.apache.flink.api.java.record.io.FileInputFormat;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.ReflectionUtil;
-
-
-public class AvroInputFormat<E> extends FileInputFormat {
-	
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(AvroInputFormat.class);
-	
-	
-	private final Class<? extends AvroBaseValue<E>> avroWrapperTypeClass;
-	
-	private final Class<E> avroValueType;
-	
-
-	private transient FileReader<E> dataFileReader;
-	
-	private transient E reuseAvroValue;
-	
-	private transient AvroBaseValue<E> wrapper;
-	
-	
-	public AvroInputFormat(Class<? extends AvroBaseValue<E>> wrapperClass) {
-		this.avroWrapperTypeClass = wrapperClass;
-		this.avroValueType = ReflectionUtil.getTemplateType1(wrapperClass);
-		this.unsplittable = true;
-	}
-	
-	public AvroInputFormat(Class<? extends AvroBaseValue<E>> wrapperClass, Class<E> avroType) {
-		this.avroValueType = avroType;
-		this.avroWrapperTypeClass = wrapperClass;
-		this.unsplittable = true;
-	}
-
-	@Override
-	public void open(FileInputSplit split) throws IOException {
-		super.open(split);
-		
-		this.wrapper = InstantiationUtil.instantiate(avroWrapperTypeClass, AvroBaseValue.class);
-		
-		DatumReader<E> datumReader;
-		if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
-			datumReader = new SpecificDatumReader<E>(avroValueType);
-		} else {
-			datumReader = new ReflectDatumReader<E>(avroValueType);
-		}
-		
-		LOG.info("Opening split " + split);
-		
-		SeekableInput in = new FSDataInputStreamWrapper(stream, (int) split.getLength());
-		
-		dataFileReader = DataFileReader.openReader(in, datumReader);
-		dataFileReader.sync(split.getStart());
-		
-		reuseAvroValue = null;
-	}
-
-	@Override
-	public boolean reachedEnd() throws IOException {
-		return !dataFileReader.hasNext();
-	}
-
-	@Override
-	public Record nextRecord(Record record) throws IOException {
-		if (!dataFileReader.hasNext()) {
-			return null;
-		}
-		
-		reuseAvroValue = dataFileReader.next(reuseAvroValue);
-		wrapper.datum(reuseAvroValue);
-		record.setField(0, wrapper);
-		return record;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
deleted file mode 100644
index 1639ec4..0000000
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormat.java
+++ /dev/null
@@ -1,374 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io.avro;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.avro.FSDataInputStreamWrapper;
-import org.apache.flink.api.java.record.io.FileInputFormat;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.ListValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.MapValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-
-/**
- * Input format to read Avro files.
- * 
- * The input format currently supports only flat avro schemas. So there is no
- * support for complex types except for nullable primitve fields, e.g.
- * ["string", null] (See
- * http://avro.apache.org/docs/current/spec.html#schema_complex)
- * 
- */
-public class AvroRecordInputFormat extends FileInputFormat {
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(AvroRecordInputFormat.class);
-
-	private FileReader<GenericRecord> dataFileReader;
-	private GenericRecord reuseAvroRecord = null;
-
-	@Override
-	public void open(FileInputSplit split) throws IOException {
-		super.open(split);
-		DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>();
-		SeekableInput in = new FSDataInputStreamWrapper(stream, (int) split.getLength());
-		LOG.info("Opening split " + split);
-		dataFileReader = DataFileReader.openReader(in, datumReader);
-		dataFileReader.sync(split.getStart());
-	}
-
-	@Override
-	public boolean reachedEnd() throws IOException {
-		return !dataFileReader.hasNext();
-	}
-
-	@Override
-	public Record nextRecord(Record record) throws IOException {
-		if (!dataFileReader.hasNext()) {
-			return null;
-		}
-		if (record == null) {
-			throw new IllegalArgumentException("Empty PactRecord given");
-		}
-		reuseAvroRecord = dataFileReader.next(reuseAvroRecord);
-		final List<Field> fields = reuseAvroRecord.getSchema().getFields();
-		for (Field field : fields) {
-			final Value value = convertAvroToPactValue(field, reuseAvroRecord.get(field.pos()));
-			record.setField(field.pos(), value);
-			record.updateBinaryRepresenation();
-		}
-
-		return record;
-	}
-
-
-	@SuppressWarnings("unchecked")
-	private final Value convertAvroToPactValue(final Field field, final Object avroRecord) {
-		if (avroRecord == null) {
-			return null;
-		}
-		final Type type = checkTypeConstraintsAndGetType(field.schema());
-
-		// check for complex types
-		// (complex type FIXED is not yet supported)
-		switch (type) {
-			case ARRAY:
-				final Type elementType = field.schema().getElementType().getType();
-				final List<?> avroList = (List<?>) avroRecord;
-				return convertAvroArrayToListValue(elementType, avroList);
-			case ENUM:
-				final List<String> symbols = field.schema().getEnumSymbols();
-				final String avroRecordString = avroRecord.toString();
-				if (!symbols.contains(avroRecordString)) {
-					throw new RuntimeException("The given Avro file contains field with a invalid enum symbol");
-				}
-				sString.setValue(avroRecordString);
-				return sString;
-			case MAP:
-				final Type valueType = field.schema().getValueType().getType();
-				final Map<CharSequence, ?> avroMap = (Map<CharSequence, ?>) avroRecord;
-				return convertAvroMapToMapValue(valueType, avroMap);
-	
-			// primitive type
-			default:
-				return convertAvroPrimitiveToValue(type, avroRecord);
-
-		}
-	}
-
-	private final ListValue<?> convertAvroArrayToListValue(Type elementType, List<?> avroList) {
-		switch (elementType) {
-		case STRING:
-			StringListValue sl = new StringListValue();
-			for (Object item : avroList) {
-				sl.add(new StringValue((CharSequence) item));
-			}
-			return sl;
-		case INT:
-			IntListValue il = new IntListValue();
-			for (Object item : avroList) {
-				il.add(new IntValue((Integer) item));
-			}
-			return il;
-		case BOOLEAN:
-			BooleanListValue bl = new BooleanListValue();
-			for (Object item : avroList) {
-				bl.add(new BooleanValue((Boolean) item));
-			}
-			return bl;
-		case DOUBLE:
-			DoubleListValue dl = new DoubleListValue();
-			for (Object item : avroList) {
-				dl.add(new DoubleValue((Double) item));
-			}
-			return dl;
-		case FLOAT:
-			FloatListValue fl = new FloatListValue();
-			for (Object item : avroList) {
-				fl.add(new FloatValue((Float) item));
-			}
-			return fl;
-		case LONG:
-			LongListValue ll = new LongListValue();
-			for (Object item : avroList) {
-				ll.add(new LongValue((Long) item));
-			}
-			return ll;
-		default:
-			throw new RuntimeException("Elements of type " + elementType + " are not supported for Avro arrays.");
-		}
-	}
-
-	private final MapValue<StringValue, ?> convertAvroMapToMapValue(Type mapValueType, Map<CharSequence, ?> avroMap) {
-		switch (mapValueType) {
-		case STRING:
-			StringMapValue sm = new StringMapValue();
-			for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
-				sm.put(new StringValue((CharSequence) entry.getKey()), new StringValue((String) entry.getValue()));
-			}
-			return sm;
-		case INT:
-			IntMapValue im = new IntMapValue();
-			for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
-				im.put(new StringValue((CharSequence) entry.getKey()), new IntValue((Integer) entry.getValue()));
-			}
-			return im;
-		case BOOLEAN:
-			BooleanMapValue bm = new BooleanMapValue();
-			for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
-				bm.put(new StringValue((CharSequence) entry.getKey()), new BooleanValue((Boolean) entry.getValue()));
-			}
-			return bm;
-		case DOUBLE:
-			DoubleMapValue dm = new DoubleMapValue();
-			for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
-				dm.put(new StringValue((CharSequence) entry.getKey()), new DoubleValue((Double) entry.getValue()));
-			}
-			return dm;
-		case FLOAT:
-			FloatMapValue fm = new FloatMapValue();
-			for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
-				fm.put(new StringValue((CharSequence) entry.getKey()), new FloatValue((Float) entry.getValue()));
-			}
-			return fm;
-		case LONG:
-			LongMapValue lm = new LongMapValue();
-			for (Map.Entry<CharSequence, ?> entry : avroMap.entrySet()) {
-				lm.put(new StringValue((CharSequence) entry.getKey()), new LongValue((Long) entry.getValue()));
-			}
-			return lm;
-
-		default:
-			throw new RuntimeException("Map values of type " + mapValueType + " are not supported for Avro map.");
-		}
-	}
-
-	private StringValue sString = new StringValue();
-	private IntValue sInt = new IntValue();
-	private BooleanValue sBool = new BooleanValue();
-	private DoubleValue sDouble = new DoubleValue();
-	private FloatValue sFloat = new FloatValue();
-	private LongValue sLong = new LongValue();
-	
-	private final Value convertAvroPrimitiveToValue(Type type, Object avroRecord) {
-		switch (type) {
-		case STRING:
-			sString.setValue((CharSequence) avroRecord);
-			return sString;
-		case INT:
-			sInt.setValue((Integer) avroRecord);
-			return sInt;
-		case BOOLEAN:
-			sBool.setValue((Boolean) avroRecord);
-			return sBool;
-		case DOUBLE:
-			sDouble.setValue((Double) avroRecord);
-			return sDouble;
-		case FLOAT:
-			sFloat.setValue((Float) avroRecord);
-			return sFloat;
-		case LONG:
-			sLong.setValue((Long) avroRecord);
-			return sLong;
-		case NULL:
-			return NullValue.getInstance();
-		default:
-			throw new RuntimeException(
-					"Type "
-							+ type
-							+ " for AvroInputFormat is not implemented. Open an issue on GitHub.");
-		}
-	}
-
-	private final Type checkTypeConstraintsAndGetType(final Schema schema) {
-		final Type type = schema.getType();
-		if (type == Type.RECORD) {
-			throw new RuntimeException("The given Avro file contains complex data types which are not supported right now");
-		}
-
-		if (type == Type.UNION) {
-			List<Schema> types = schema.getTypes();
-			if (types.size() > 2) {
-				throw new RuntimeException("The given Avro file contains a union that has more than two elements");
-			}
-			if (types.size() == 1 && types.get(0).getType() != Type.UNION) {
-				return types.get(0).getType();
-			}
-			if (types.get(0).getType() == Type.UNION || types.get(1).getType() == Type.UNION) {
-				throw new RuntimeException("The given Avro file contains a nested union");
-			}
-			if (types.get(0).getType() == Type.NULL) {
-				return types.get(1).getType();
-			} else {
-				if (types.get(1).getType() != Type.NULL) {
-					throw new RuntimeException("The given Avro file is contains a union with two non-null types.");
-				}
-				return types.get(0).getType();
-			}
-		}
-		return type;
-	}
-
-	/**
-	 * Set minNumSplits to number of files.
-	 */
-	@Override
-	public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException {
-		int numAvroFiles = 0;
-		final Path path = this.filePath;
-		// get all the files that are involved in the splits
-		final FileSystem fs = path.getFileSystem();
-		final FileStatus pathFile = fs.getFileStatus(path);
-
-		if (!acceptFile(pathFile)) {
-			throw new IOException("The given file does not pass the file-filter");
-		}
-		if (pathFile.isDir()) {
-			// input is directory. list all contained files
-			final FileStatus[] dir = fs.listStatus(path);
-			for (int i = 0; i < dir.length; i++) {
-				if (!dir[i].isDir() && acceptFile(dir[i])) {
-					numAvroFiles++;
-				}
-			}
-		} else {
-			numAvroFiles = 1;
-		}
-		return super.createInputSplits(numAvroFiles);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Concrete subclasses of ListValue and MapValue for all possible primitive types
-	// --------------------------------------------------------------------------------------------
-
-	public static class StringListValue extends ListValue<StringValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class IntListValue extends ListValue<IntValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class BooleanListValue extends ListValue<BooleanValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class DoubleListValue extends ListValue<DoubleValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class FloatListValue extends ListValue<FloatValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class LongListValue extends ListValue<LongValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class StringMapValue extends MapValue<StringValue, StringValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class IntMapValue extends MapValue<StringValue, IntValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class BooleanMapValue extends MapValue<StringValue, BooleanValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class DoubleMapValue extends MapValue<StringValue, DoubleValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class FloatMapValue extends MapValue<StringValue, FloatValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-	public static class LongMapValue extends MapValue<StringValue, LongValue> {
-		private static final long serialVersionUID = 1L;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java
deleted file mode 100644
index d6d1213..0000000
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/ReflectiveAvroTypeExample.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.avro.example;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.Random;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.GenericInputFormat;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-import org.apache.flink.api.java.record.operators.GenericDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.client.LocalExecutor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-
-
-public class ReflectiveAvroTypeExample {
-	
-	
-	public static void main(String[] args) throws Exception {
-		
-		GenericDataSource<UserGeneratingInputFormat> source = new GenericDataSource<UserGeneratingInputFormat>(UserGeneratingInputFormat.class);
-		
-		MapOperator mapper = MapOperator.builder(new NumberExtractingMapper())
-				.input(source).name("le mapper").build();
-		
-		ReduceOperator reducer = ReduceOperator.builder(new ConcatenatingReducer(), IntValue.class, 1)
-				.input(mapper).name("le reducer").build();
-		
-		GenericDataSink sink = new GenericDataSink(PrintingOutputFormat.class, reducer);
-		
-		Plan p = new Plan(sink);
-		p.setDefaultParallelism(4);
-		
-		LocalExecutor.execute(p);
-	}
-	
-	
-	public static final class NumberExtractingMapper extends MapFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-		
-		@Override
-		public void map(Record record, Collector<Record> out) throws Exception {
-			User u = record.getField(0, SUser.class).datum();
-			record.setField(1, new IntValue(u.getFavoriteNumber()));
-			out.collect(record);
-		}
-	}
-	
-	
-	public static final class ConcatenatingReducer extends ReduceFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-		
-		private final Record result = new Record(2);
-
-		@Override
-		public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
-			Record r = records.next();
-			
-			int num = r.getField(1, IntValue.class).getValue();
-			String names = r.getField(0, SUser.class).datum().getFavoriteColor().toString();
-			
-			while (records.hasNext()) {
-				r = records.next();
-				names += " - " + r.getField(0, SUser.class).datum().getFavoriteColor().toString();
-			}
-			
-			result.setField(0, new IntValue(num));
-			result.setField(1,  new StringValue(names));
-			out.collect(result);
-		}
-
-	}
-	
-	
-	public static final class UserGeneratingInputFormat extends GenericInputFormat {
-
-		private static final long serialVersionUID = 1L;
-		
-		private static final int NUM = 100;
-		
-		private final Random rnd = new Random(32498562304986L);
-		
-		private static final String[] NAMES = { "Peter", "Bob", "Liddy", "Alexander", "Stan" };
-		
-		private static final String[] COLORS = { "mauve", "crimson", "copper", "sky", "grass" };
-		
-		private int count;
-		
-
-		@Override
-		public boolean reachedEnd() throws IOException {
-			return count >= NUM;
-		}
-
-		@Override
-		public Record nextRecord(Record record) throws IOException {
-			count++;
-			
-			User u = new User();
-			u.setName(NAMES[rnd.nextInt(NAMES.length)]);
-			u.setFavoriteColor(COLORS[rnd.nextInt(COLORS.length)]);
-			u.setFavoriteNumber(rnd.nextInt(87));
-			
-			SUser su = new SUser();
-			su.datum(u);
-			
-			record.setField(0, su);
-			return record;
-		}
-	}
-	
-	public static final class PrintingOutputFormat implements OutputFormat<Record> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void configure(Configuration parameters) {}
-
-		@Override
-		public void open(int taskNumber, int numTasks) throws IOException {}
-
-		@Override
-		public void writeRecord(Record record) throws IOException {
-			int color = record.getField(0, IntValue.class).getValue();
-			String names = record.getField(1, StringValue.class).getValue();
-			
-			System.out.println(color + ": " + names);
-		}
-		
-		@Override
-		public void close() throws IOException {}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java
deleted file mode 100644
index 542639e..0000000
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/SUser.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.avro.example;
-
-import org.apache.flink.api.avro.AvroBaseValue;
-
-public class SUser extends AvroBaseValue<User> {
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java b/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java
deleted file mode 100644
index d4fb292..0000000
--- a/flink-addons/flink-avro/src/main/java/org/apache/flink/api/java/record/io/avro/example/User.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * Autogenerated by Avro
- * 
- * DO NOT EDIT DIRECTLY
- */
-package org.apache.flink.api.java.record.io.avro.example;  
-@SuppressWarnings("all")
-@org.apache.avro.specific.AvroGenerated
-public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
-  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.avro.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
-  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
-  @Deprecated public java.lang.CharSequence name;
-  @Deprecated public java.lang.Integer favorite_number;
-  @Deprecated public java.lang.CharSequence favorite_color;
-
-  /**
-   * Default constructor.  Note that this does not initialize fields
-   * to their default values from the schema.  If that is desired then
-   * one should use {@link \#newBuilder()}. 
-   */
-  public User() {}
-
-  /**
-   * All-args constructor.
-   */
-  public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color) {
-    this.name = name;
-    this.favorite_number = favorite_number;
-    this.favorite_color = favorite_color;
-  }
-
-  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
-  // Used by DatumWriter.  Applications should not call. 
-  public java.lang.Object get(int field$) {
-    switch (field$) {
-    case 0: return name;
-    case 1: return favorite_number;
-    case 2: return favorite_color;
-    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
-    }
-  }
-  // Used by DatumReader.  Applications should not call. 
-  @SuppressWarnings(value="unchecked")
-  public void put(int field$, java.lang.Object value$) {
-    switch (field$) {
-    case 0: name = (java.lang.CharSequence)value$; break;
-    case 1: favorite_number = (java.lang.Integer)value$; break;
-    case 2: favorite_color = (java.lang.CharSequence)value$; break;
-    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
-    }
-  }
-
-  /**
-   * Gets the value of the 'name' field.
-   */
-  public java.lang.CharSequence getName() {
-    return name;
-  }
-
-  /**
-   * Sets the value of the 'name' field.
-   * @param value the value to set.
-   */
-  public void setName(java.lang.CharSequence value) {
-    this.name = value;
-  }
-
-  /**
-   * Gets the value of the 'favorite_number' field.
-   */
-  public java.lang.Integer getFavoriteNumber() {
-    return favorite_number;
-  }
-
-  /**
-   * Sets the value of the 'favorite_number' field.
-   * @param value the value to set.
-   */
-  public void setFavoriteNumber(java.lang.Integer value) {
-    this.favorite_number = value;
-  }
-
-  /**
-   * Gets the value of the 'favorite_color' field.
-   */
-  public java.lang.CharSequence getFavoriteColor() {
-    return favorite_color;
-  }
-
-  /**
-   * Sets the value of the 'favorite_color' field.
-   * @param value the value to set.
-   */
-  public void setFavoriteColor(java.lang.CharSequence value) {
-    this.favorite_color = value;
-  }
-
-  /** Creates a new User RecordBuilder */
-  public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder() {
-    return new org.apache.flink.api.java.record.io.avro.example.User.Builder();
-  }
-  
-  /** Creates a new User RecordBuilder by copying an existing Builder */
-  public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.example.User.Builder other) {
-    return new org.apache.flink.api.java.record.io.avro.example.User.Builder(other);
-  }
-  
-  /** Creates a new User RecordBuilder by copying an existing User instance */
-  public static org.apache.flink.api.java.record.io.avro.example.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.example.User other) {
-    return new org.apache.flink.api.java.record.io.avro.example.User.Builder(other);
-  }
-  
-  /**
-   * RecordBuilder for User instances.
-   */
-  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User>
-    implements org.apache.avro.data.RecordBuilder<User> {
-
-    private java.lang.CharSequence name;
-    private java.lang.Integer favorite_number;
-    private java.lang.CharSequence favorite_color;
-
-    /** Creates a new Builder */
-    private Builder() {
-      super(org.apache.flink.api.java.record.io.avro.example.User.SCHEMA$);
-    }
-    
-    /** Creates a Builder by copying an existing Builder */
-    private Builder(org.apache.flink.api.java.record.io.avro.example.User.Builder other) {
-      super(other);
-      if (isValidValue(fields()[0], other.name)) {
-        this.name = data().deepCopy(fields()[0].schema(), other.name);
-        fieldSetFlags()[0] = true;
-      }
-      if (isValidValue(fields()[1], other.favorite_number)) {
-        this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
-        fieldSetFlags()[1] = true;
-      }
-      if (isValidValue(fields()[2], other.favorite_color)) {
-        this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
-        fieldSetFlags()[2] = true;
-      }
-    }
-    
-    /** Creates a Builder by copying an existing User instance */
-    private Builder(org.apache.flink.api.java.record.io.avro.example.User other) {
-            super(org.apache.flink.api.java.record.io.avro.example.User.SCHEMA$);
-      if (isValidValue(fields()[0], other.name)) {
-        this.name = data().deepCopy(fields()[0].schema(), other.name);
-        fieldSetFlags()[0] = true;
-      }
-      if (isValidValue(fields()[1], other.favorite_number)) {
-        this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
-        fieldSetFlags()[1] = true;
-      }
-      if (isValidValue(fields()[2], other.favorite_color)) {
-        this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
-        fieldSetFlags()[2] = true;
-      }
-    }
-
-    /** Gets the value of the 'name' field */
-    public java.lang.CharSequence getName() {
-      return name;
-    }
-    
-    /** Sets the value of the 'name' field */
-    public org.apache.flink.api.java.record.io.avro.example.User.Builder setName(java.lang.CharSequence value) {
-      validate(fields()[0], value);
-      this.name = value;
-      fieldSetFlags()[0] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'name' field has been set */
-    public boolean hasName() {
-      return fieldSetFlags()[0];
-    }
-    
-    /** Clears the value of the 'name' field */
-    public org.apache.flink.api.java.record.io.avro.example.User.Builder clearName() {
-      name = null;
-      fieldSetFlags()[0] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'favorite_number' field */
-    public java.lang.Integer getFavoriteNumber() {
-      return favorite_number;
-    }
-    
-    /** Sets the value of the 'favorite_number' field */
-    public org.apache.flink.api.java.record.io.avro.example.User.Builder setFavoriteNumber(java.lang.Integer value) {
-      validate(fields()[1], value);
-      this.favorite_number = value;
-      fieldSetFlags()[1] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'favorite_number' field has been set */
-    public boolean hasFavoriteNumber() {
-      return fieldSetFlags()[1];
-    }
-    
-    /** Clears the value of the 'favorite_number' field */
-    public org.apache.flink.api.java.record.io.avro.example.User.Builder clearFavoriteNumber() {
-      favorite_number = null;
-      fieldSetFlags()[1] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'favorite_color' field */
-    public java.lang.CharSequence getFavoriteColor() {
-      return favorite_color;
-    }
-    
-    /** Sets the value of the 'favorite_color' field */
-    public org.apache.flink.api.java.record.io.avro.example.User.Builder setFavoriteColor(java.lang.CharSequence value) {
-      validate(fields()[2], value);
-      this.favorite_color = value;
-      fieldSetFlags()[2] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'favorite_color' field has been set */
-    public boolean hasFavoriteColor() {
-      return fieldSetFlags()[2];
-    }
-    
-    /** Clears the value of the 'favorite_color' field */
-    public org.apache.flink.api.java.record.io.avro.example.User.Builder clearFavoriteColor() {
-      favorite_color = null;
-      fieldSetFlags()[2] = false;
-      return this;
-    }
-
-    @Override
-    public User build() {
-      try {
-        User record = new User();
-        record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
-        record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]);
-        record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]);
-        return record;
-      } catch (Exception e) {
-        throw new org.apache.avro.AvroRuntimeException(e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
index a60787d..db235c0 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
@@ -28,11 +28,11 @@ import org.apache.avro.file.DataFileReader;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.flink.api.io.avro.example.User;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.api.java.record.io.avro.example.User;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.test.util.JavaProgramTestBase;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
deleted file mode 100644
index a3850e2..0000000
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.avro.reflect.Nullable;
-import org.apache.flink.api.avro.AvroBaseValue;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.api.java.record.io.GenericInputFormat;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-import org.apache.flink.api.java.record.operators.GenericDataSource;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-
-public class AvroWithEmptyArrayITCase extends RecordAPITestBase {
-
-	@Override
-	protected Plan getTestJob() {
-		GenericDataSource<RandomInputFormat> bookSource = new GenericDataSource<RandomInputFormat>(
-			new RandomInputFormat(true));
-		GenericDataSource<RandomInputFormat> authorSource = new GenericDataSource<RandomInputFormat>(
-			new RandomInputFormat(false));
-
-		CoGroupOperator coGroupOperator = CoGroupOperator.builder(MyCoGrouper.class, LongValue.class, 0, 0)
-			.input1(bookSource).input2(authorSource).name("CoGrouper Test").build();
-
-		GenericDataSink sink = new GenericDataSink(PrintingOutputFormat.class, coGroupOperator);
-
-		Plan plan = new Plan(sink, "CoGroper Test Plan");
-		plan.setDefaultParallelism(1);
-		return plan;
-	}
-
-	public static class SBookAvroValue extends AvroBaseValue<Book> {
-		private static final long serialVersionUID = 1L;
-
-		public SBookAvroValue() {}
-
-		public SBookAvroValue(Book datum) {
-			super(datum);
-		}
-	}
-
-	public static class Book {
-
-		long bookId;
-		@Nullable
-		String title;
-		long authorId;
-
-		public Book() {
-		}
-
-		public Book(long bookId, String title, long authorId) {
-			this.bookId = bookId;
-			this.title = title;
-			this.authorId = authorId;
-		}
-	}
-
-	public static class SBookAuthorValue extends AvroBaseValue<BookAuthor> {
-		private static final long serialVersionUID = 1L;
-
-		public SBookAuthorValue() {}
-
-		public SBookAuthorValue(BookAuthor datum) {
-			super(datum);
-		}
-	}
-
-	public static class BookAuthor {
-
-		enum BookType {
-			book,
-			article,
-			journal
-		}
-
-		long authorId;
-
-		@Nullable
-		List<String> bookTitles;
-
-		@Nullable
-		List<Book> books;
-
-		String authorName;
-
-		BookType bookType;
-
-		public BookAuthor() {}
-
-		public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
-			this.authorId = authorId;
-			this.bookTitles = bookTitles;
-			this.authorName = authorName;
-		}
-	}
-
-	public static class RandomInputFormat extends GenericInputFormat {
-		private static final long serialVersionUID = 1L;
-
-		private final boolean isBook;
-
-		private boolean touched = false;
-
-		public RandomInputFormat(boolean isBook) {
-			this.isBook = isBook;
-		}
-
-		@Override
-		public boolean reachedEnd() throws IOException {
-			return touched;
-		}
-
-		@Override
-		public Record nextRecord(Record record) throws IOException {
-			touched = true;
-			record.setField(0, new LongValue(26382648));
-
-			if (isBook) {
-				Book b = new Book(123, "This is a test book", 26382648);
-				record.setField(1, new SBookAvroValue(b));
-			} else {
-				List<String> titles = new ArrayList<String>();
-				// titles.add("Title1");
-				// titles.add("Title2");
-				// titles.add("Title3");
-
-				List<Book> books = new ArrayList<Book>();
-				books.add(new Book(123, "This is a test book", 1));
-				books.add(new Book(24234234, "This is a test book", 1));
-				books.add(new Book(1234324, "This is a test book", 3));
-
-				BookAuthor a = new BookAuthor(1, titles, "Test Author");
-				a.books = books;
-				a.bookType = BookAuthor.BookType.journal;
-				record.setField(1, new SBookAuthorValue(a));
-			}
-
-			return record;
-		}
-	}
-
-	public static final class PrintingOutputFormat implements OutputFormat<Record> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void configure(Configuration parameters) {}
-
-		@Override
-		public void open(int taskNumber, int numTasks) {}
-
-		@Override
-		public void writeRecord(Record record) throws IOException {
-			long key = record.getField(0, LongValue.class).getValue();
-			String val = record.getField(1, StringValue.class).getValue();
-			System.out.println(key + " : " + val);
-		}
-
-		@Override
-		public void close() {}
-	}
-
-	public static class MyCoGrouper extends CoGroupFunction {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) {
-
-			Record r1 = null;
-			while (records1.hasNext()) {
-				r1 = records1.next();
-			}
-			Record r2 = null;
-			while (records2.hasNext()) {
-				r2 = records2.next();
-			}
-
-			if (r1 != null) {
-				r1.getField(1, SBookAvroValue.class).datum();
-			}
-
-			if (r2 != null) {
-				r2.getField(1, SBookAuthorValue.class).datum();
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
index 78ff2f1..74b5397 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
@@ -31,8 +31,8 @@ import java.util.Random;
 
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.api.java.record.io.avro.generated.Colors;
-import org.apache.flink.api.java.record.io.avro.generated.User;
+import org.apache.flink.api.io.avro.generated.Colors;
+import org.apache.flink.api.io.avro.generated.User;
 import org.apache.flink.util.StringUtils;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
index 31e083f..25e2e0c 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
@@ -39,7 +39,6 @@ import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.api.avro.AvroBaseValue;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -125,18 +124,6 @@ public class AvroExternalJarProgram  {
 		}
 	}
 	
-	
-	public static final class SUser extends AvroBaseValue<MyUser> {
-		
-		static final long serialVersionUID = 1L;
-
-		public SUser() {}
-	
-		public SUser(MyUser u) {
-			super(u);
-		}
-	}
-	
 	// --------------------------------------------------------------------------------------------
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
new file mode 100644
index 0000000..d8d8b46
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.io.avro;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.api.io.avro.generated.Colors;
+import org.apache.flink.api.io.avro.generated.User;
+import org.apache.flink.api.java.io.AvroInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the avro input format.
+ * (The testcase is mostly the getting started tutorial of avro)
+ * http://avro.apache.org/docs/current/gettingstartedjava.html
+ */
+public class AvroRecordInputFormatTest {
+	
+	private File testFile;
+	
+	final static String TEST_NAME = "Alyssa";
+	
+	final static String TEST_ARRAY_STRING_1 = "ELEMENT 1";
+	final static String TEST_ARRAY_STRING_2 = "ELEMENT 2";
+	
+	final static boolean TEST_ARRAY_BOOLEAN_1 = true;
+	final static boolean TEST_ARRAY_BOOLEAN_2 = false;
+	
+	final static Colors TEST_ENUM_COLOR = Colors.GREEN;
+	
+	final static String TEST_MAP_KEY1 = "KEY 1";
+	final static long TEST_MAP_VALUE1 = 8546456L;
+	final static String TEST_MAP_KEY2 = "KEY 2";
+	final static long TEST_MAP_VALUE2 = 17554L;
+
+	@Before
+	public void createFiles() throws IOException {
+		testFile = File.createTempFile("AvroInputFormatTest", null);
+		
+		ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>();
+		stringArray.add(TEST_ARRAY_STRING_1);
+		stringArray.add(TEST_ARRAY_STRING_2);
+		
+		ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
+		booleanArray.add(TEST_ARRAY_BOOLEAN_1);
+		booleanArray.add(TEST_ARRAY_BOOLEAN_2);
+		
+		HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>();
+		longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
+		longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
+		
+		
+		User user1 = new User();
+		user1.setName(TEST_NAME);
+		user1.setFavoriteNumber(256);
+		user1.setTypeDoubleTest(123.45d);
+		user1.setTypeBoolTest(true);
+		user1.setTypeArrayString(stringArray);
+		user1.setTypeArrayBoolean(booleanArray);
+		user1.setTypeEnum(TEST_ENUM_COLOR);
+		user1.setTypeMap(longMap);
+		
+		// Construct via builder
+		User user2 = User.newBuilder()
+		             .setName("Charlie")
+		             .setFavoriteColor("blue")
+		             .setFavoriteNumber(null)
+		             .setTypeBoolTest(false)
+		             .setTypeDoubleTest(1.337d)
+		             .setTypeNullTest(null)
+		             .setTypeLongTest(1337L)
+		             .setTypeArrayString(new ArrayList<CharSequence>())
+		             .setTypeArrayBoolean(new ArrayList<Boolean>())
+		             .setTypeNullableArray(null)
+		             .setTypeEnum(Colors.RED)
+		             .setTypeMap(new HashMap<CharSequence, Long>())
+		             .build();
+		DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
+		DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
+		dataFileWriter.create(user1.getSchema(), testFile);
+		dataFileWriter.append(user1);
+		dataFileWriter.append(user2);
+		dataFileWriter.close();
+	}
+	
+	@Test
+	public void testDeserialisation() throws IOException {
+		Configuration parameters = new Configuration();
+		
+		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+		
+		format.configure(parameters);
+		FileInputSplit[] splits = format.createInputSplits(1);
+		assertEquals(splits.length, 1);
+		format.open(splits[0]);
+		
+		User u = format.nextRecord(null);
+		assertNotNull(u);
+		
+		String name = u.getName().toString();
+		assertNotNull("empty record", name);
+		assertEquals("name not equal", TEST_NAME, name);
+		
+		// check arrays
+		List<CharSequence> sl = u.getTypeArrayString();
+		assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString());
+		assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString());
+		
+		List<Boolean> bl = u.getTypeArrayBoolean();
+		assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0));
+		assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1));
+		
+		// check enums
+		Colors enumValue = u.getTypeEnum();
+		assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue);
+		
+		// check maps
+		Map<CharSequence, Long> lm = u.getTypeMap();
+		assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
+		assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
+		
+		assertFalse("expecting second element", format.reachedEnd());
+		assertNotNull("expecting second element", format.nextRecord(u));
+		
+		assertNull(format.nextRecord(u));
+		assertTrue(format.reachedEnd());
+		
+		format.close();
+	}
+	
+	@After
+	public void deleteFiles() {
+		testFile.delete();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/626d6b78/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java
new file mode 100644
index 0000000..58e1f5c
--- /dev/null
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Autogenerated by Avro
+ * 
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.flink.api.io.avro.generated;  
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public enum Colors { 
+  RED, GREEN, BLUE  ;
+  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"enum\",\"name\":\"Colors\",\"namespace\":\"org.apache.flink.api.io.avro.generated\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}");
+  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
+}