You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/07/12 14:48:50 UTC

[71/73] [abbrv] prefix all projects in addons and quickstarts with flink-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-addons/avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
deleted file mode 100644
index 4a6e7f1..0000000
--- a/flink-addons/avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.File;
-import java.net.InetSocketAddress;
-
-import org.apache.flink.client.minicluster.NepheleMiniCluster;
-import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.LogUtils;
-import org.junit.Assert;
-import org.junit.Test;
-
-
-public class AvroExternalJarProgramITCase {
-
-	private static final int TEST_JM_PORT = 43191;
-	
-	private static final String JAR_FILE = "target/maven-test-jar.jar";
-	
-	private static final String TEST_DATA_FILE = "/testdata.avro";
-
-	static {
-		LogUtils.initializeDefaultTestConsoleLogger();
-	}
-	
-	@Test
-	public void testExternalProgram() {
-		
-		NepheleMiniCluster testMiniCluster = null;
-		
-		try {
-			testMiniCluster = new NepheleMiniCluster();
-			testMiniCluster.setJobManagerRpcPort(TEST_JM_PORT);
-			testMiniCluster.setTaskManagerNumSlots(4);
-			testMiniCluster.start();
-			
-			String jarFile = JAR_FILE;
-			String testData = getClass().getResource(TEST_DATA_FILE).toString();
-			
-			PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });
-						
-			Client c = new Client(new InetSocketAddress("localhost", TEST_JM_PORT), new Configuration());
-			c.run(program, 4, true);
-		}
-		catch (Throwable t) {
-			System.err.println(t.getMessage());
-			t.printStackTrace();
-			Assert.fail("Error during the packaged program execution: " + t.getMessage());
-		}
-		finally {
-			if (testMiniCluster != null) {
-				try {
-					testMiniCluster.stop();
-				} catch (Throwable t) {}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java b/flink-addons/avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
deleted file mode 100644
index 637a5e9..0000000
--- a/flink-addons/avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import org.junit.Assert;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.api.java.record.io.avro.example.User;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-@SuppressWarnings("serial")
-public class AvroOutputFormatTest extends JavaProgramTestBase {
-
-	public static String outputPath1;
-
-	public static String outputPath2;
-
-	public static String inputPath;
-
-	public static String userData = "alice|1|blue\n" +
-		"bob|2|red\n" +
-		"john|3|yellow\n" +
-		"walt|4|black\n";
-
-	@Override
-	protected void preSubmit() throws Exception {
-		inputPath = createTempFile("user", userData);
-		outputPath1 = getTempDirPath("avro_output1");
-		outputPath2 = getTempDirPath("avro_output2");
-	}
-
-
-	@Override
-	protected void testProgram() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<String, Integer, String>> input = env.readCsvFile(inputPath)
-			.fieldDelimiter('|')
-			.types(String.class, Integer.class, String.class);
-
-		//output the data with AvroOutputFormat for specific user type
-		DataSet<User> specificUser = input.map(new ConvertToUser());
-		specificUser.write(new AvroOutputFormat<User>(User.class), outputPath1);
-
-		//output the data with AvroOutputFormat for reflect user type
-		DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new ConvertToReflective());
-		reflectiveUser.write(new AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2);
-
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		//compare result for specific user type
-		File [] output1;
-		File file1 = asFile(outputPath1);
-		if (file1.isDirectory()) {
-			output1 = file1.listFiles();
-		} else {
-			output1 = new File[] {file1};
-		}
-		List<String> result1 = new ArrayList<String>();
-		DatumReader<User> userDatumReader1 = new SpecificDatumReader<User>(User.class);
-		for (File avroOutput : output1) {
-			DataFileReader<User> dataFileReader1 = new DataFileReader<User>(avroOutput, userDatumReader1);
-			while (dataFileReader1.hasNext()) {
-				User user = dataFileReader1.next();
-				result1.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
-			}
-		}
-		for (String expectedResult : userData.split("\n")) {
-			Assert.assertTrue("expected user " + expectedResult + " not found.", result1.contains(expectedResult));
-		}
-
-		//compare result for reflect user type
-		File [] output2;
-		File file2 = asFile(outputPath2);
-		if (file2.isDirectory()) {
-			output2 = file2.listFiles();
-		} else {
-			output2 = new File[] {file2};
-		}
-		List<String> result2 = new ArrayList<String>();
-		DatumReader<ReflectiveUser> userDatumReader2 = new ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class);
-		for (File avroOutput : output2) {
-			DataFileReader<ReflectiveUser> dataFileReader2 = new DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2);
-			while (dataFileReader2.hasNext()) {
-				ReflectiveUser user = dataFileReader2.next();
-				result2.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
-			}
-		}
-		for (String expectedResult : userData.split("\n")) {
-			Assert.assertTrue("expected user " + expectedResult + " not found.", result2.contains(expectedResult));
-		}
-
-
-	}
-
-
-	public final static class ConvertToUser extends MapFunction<Tuple3<String, Integer, String>, User> {
-
-		@Override
-		public User map(Tuple3<String, Integer, String> value) throws Exception {
-			return new User(value.f0, value.f1, value.f2);
-		}
-	}
-
-	public final static class ConvertToReflective extends MapFunction<User, ReflectiveUser> {
-
-		@Override
-		public ReflectiveUser map(User value) throws Exception {
-			return new ReflectiveUser(value.getName().toString(), value.getFavoriteNumber(), value.getFavoriteColor().toString());
-		}
-	}
-
-	
-	public static class ReflectiveUser {
-		private String name;
-		private int favoriteNumber;
-		private String favoriteColor;
-
-		public ReflectiveUser() {}
-
-		public ReflectiveUser(String name, int favoriteNumber, String favoriteColor) {
-			this.name = name;
-			this.favoriteNumber = favoriteNumber;
-			this.favoriteColor = favoriteColor;
-		}
-		
-		public String getName() {
-			return this.name;
-		}
-		public String getFavoriteColor() {
-			return this.favoriteColor;
-		}
-		public int getFavoriteNumber() {
-			return this.favoriteNumber;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java b/flink-addons/avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
deleted file mode 100644
index ea9edff..0000000
--- a/flink-addons/avro/src/test/java/org/apache/flink/api/avro/AvroWithEmptyArrayITCase.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.avro.reflect.Nullable;
-import org.apache.flink.api.avro.AvroBaseValue;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.api.java.record.io.GenericInputFormat;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-import org.apache.flink.api.java.record.operators.GenericDataSource;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-
-public class AvroWithEmptyArrayITCase extends RecordAPITestBase {
-
-	@Override
-	protected Plan getTestJob() {
-		GenericDataSource<RandomInputFormat> bookSource = new GenericDataSource<RandomInputFormat>(
-			new RandomInputFormat(true));
-		GenericDataSource<RandomInputFormat> authorSource = new GenericDataSource<RandomInputFormat>(
-			new RandomInputFormat(false));
-
-		CoGroupOperator coGroupOperator = CoGroupOperator.builder(MyCoGrouper.class, LongValue.class, 0, 0)
-			.input1(bookSource).input2(authorSource).name("CoGrouper Test").build();
-
-		GenericDataSink sink = new GenericDataSink(PrintingOutputFormat.class, coGroupOperator);
-
-		Plan plan = new Plan(sink, "CoGroper Test Plan");
-		plan.setDefaultParallelism(1);
-		return plan;
-	}
-
-	public static class SBookAvroValue extends AvroBaseValue<Book> {
-		private static final long serialVersionUID = 1L;
-
-		public SBookAvroValue() {}
-
-		public SBookAvroValue(Book datum) {
-			super(datum);
-		}
-	}
-
-	public static class Book {
-
-		long bookId;
-		@Nullable
-		String title;
-		long authorId;
-
-		public Book() {
-		}
-
-		public Book(long bookId, String title, long authorId) {
-			this.bookId = bookId;
-			this.title = title;
-			this.authorId = authorId;
-		}
-	}
-
-	public static class SBookAuthorValue extends AvroBaseValue<BookAuthor> {
-		private static final long serialVersionUID = 1L;
-
-		public SBookAuthorValue() {}
-
-		public SBookAuthorValue(BookAuthor datum) {
-			super(datum);
-		}
-	}
-
-	public static class BookAuthor {
-
-		enum BookType {
-			book,
-			article,
-			journal
-		}
-
-		long authorId;
-
-		@Nullable
-		List<String> bookTitles;
-
-		@Nullable
-		List<Book> books;
-
-		String authorName;
-
-		BookType bookType;
-
-		public BookAuthor() {}
-
-		public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
-			this.authorId = authorId;
-			this.bookTitles = bookTitles;
-			this.authorName = authorName;
-		}
-	}
-
-	public static class RandomInputFormat extends GenericInputFormat {
-		private static final long serialVersionUID = 1L;
-
-		private final boolean isBook;
-
-		private boolean touched = false;
-
-		public RandomInputFormat(boolean isBook) {
-			this.isBook = isBook;
-		}
-
-		@Override
-		public boolean reachedEnd() throws IOException {
-			return touched;
-		}
-
-		@Override
-		public Record nextRecord(Record record) throws IOException {
-			touched = true;
-			record.setField(0, new LongValue(26382648));
-
-			if (isBook) {
-				Book b = new Book(123, "This is a test book", 26382648);
-				record.setField(1, new SBookAvroValue(b));
-			} else {
-				List<String> titles = new ArrayList<String>();
-				// titles.add("Title1");
-				// titles.add("Title2");
-				// titles.add("Title3");
-
-				List<Book> books = new ArrayList<Book>();
-				books.add(new Book(123, "This is a test book", 1));
-				books.add(new Book(24234234, "This is a test book", 1));
-				books.add(new Book(1234324, "This is a test book", 3));
-
-				BookAuthor a = new BookAuthor(1, titles, "Test Author");
-				a.books = books;
-				a.bookType = BookAuthor.BookType.journal;
-				record.setField(1, new SBookAuthorValue(a));
-			}
-
-			return record;
-		}
-	}
-
-	public static final class PrintingOutputFormat implements OutputFormat<Record> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void configure(Configuration parameters) {}
-
-		@Override
-		public void open(int taskNumber, int numTasks) {}
-
-		@Override
-		public void writeRecord(Record record) throws IOException {
-			long key = record.getField(0, LongValue.class).getValue();
-			String val = record.getField(1, StringValue.class).getValue();
-			System.out.println(key + " : " + val);
-		}
-
-		@Override
-		public void close() {}
-	}
-
-	public static class MyCoGrouper extends CoGroupFunction {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out)
-				throws Exception {
-
-			Record r1 = null;
-			if (records1.hasNext()) {
-				r1 = records1.next();
-			}
-			Record r2 = null;
-			if (records2.hasNext()) {
-				r2 = records2.next();
-			}
-
-			if (r1 != null) {
-				r1.getField(1, SBookAvroValue.class).datum();
-			}
-
-			if (r2 != null) {
-				r2.getField(1, SBookAuthorValue.class).datum();
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java b/flink-addons/avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
deleted file mode 100644
index 76b23ef..0000000
--- a/flink-addons/avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
+++ /dev/null
@@ -1,523 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.api.avro.DataInputDecoder;
-import org.apache.flink.api.avro.DataOutputEncoder;
-import org.apache.flink.api.java.record.io.avro.generated.Colors;
-import org.apache.flink.api.java.record.io.avro.generated.User;
-import org.apache.flink.util.StringUtils;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-
-/**
- * Tests the {@link DataOutputEncoder} and {@link DataInputDecoder} classes for Avro serialization.
- */
-public class EncoderDecoderTest {
-	
-	@Test
-	public void testComplexStringsDirecty() {
-		try {
-			Random rnd = new Random(349712539451944123L);
-			
-			for (int i = 0; i < 10; i++) {
-				String testString = StringUtils.getRandomString(rnd, 10, 100);
-				
-				ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
-				{
-					DataOutputStream dataOut = new DataOutputStream(baos);
-					DataOutputEncoder encoder = new DataOutputEncoder();
-					encoder.setOut(dataOut);
-					
-					encoder.writeString(testString);
-					dataOut.flush();
-					dataOut.close();
-				}
-				
-				byte[] data = baos.toByteArray();
-				
-				// deserialize
-				{
-					ByteArrayInputStream bais = new ByteArrayInputStream(data);
-					DataInputStream dataIn = new DataInputStream(bais);
-					DataInputDecoder decoder = new DataInputDecoder();
-					decoder.setIn(dataIn);
-	
-					String deserialized = decoder.readString();
-					
-					assertEquals(testString, deserialized);
-				}
-			}
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Test failed due to an exception: " + e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testPrimitiveTypes() {
-		
-		testObjectSerialization(new Boolean(true));
-		testObjectSerialization(new Boolean(false));
-		
-		testObjectSerialization(new Byte((byte) 0));
-		testObjectSerialization(new Byte((byte) 1));
-		testObjectSerialization(new Byte((byte) -1));
-		testObjectSerialization(new Byte(Byte.MIN_VALUE));
-		testObjectSerialization(new Byte(Byte.MAX_VALUE));
-		
-		testObjectSerialization(new Short((short) 0));
-		testObjectSerialization(new Short((short) 1));
-		testObjectSerialization(new Short((short) -1));
-		testObjectSerialization(new Short(Short.MIN_VALUE));
-		testObjectSerialization(new Short(Short.MAX_VALUE));
-		
-		testObjectSerialization(new Integer(0));
-		testObjectSerialization(new Integer(1));
-		testObjectSerialization(new Integer(-1));
-		testObjectSerialization(new Integer(Integer.MIN_VALUE));
-		testObjectSerialization(new Integer(Integer.MAX_VALUE));
-		
-		testObjectSerialization(new Long(0));
-		testObjectSerialization(new Long(1));
-		testObjectSerialization(new Long(-1));
-		testObjectSerialization(new Long(Long.MIN_VALUE));
-		testObjectSerialization(new Long(Long.MAX_VALUE));
-		
-		testObjectSerialization(new Float(0));
-		testObjectSerialization(new Float(1));
-		testObjectSerialization(new Float(-1));
-		testObjectSerialization(new Float((float)Math.E));
-		testObjectSerialization(new Float((float)Math.PI));
-		testObjectSerialization(new Float(Float.MIN_VALUE));
-		testObjectSerialization(new Float(Float.MAX_VALUE));
-		testObjectSerialization(new Float(Float.MIN_NORMAL));
-		testObjectSerialization(new Float(Float.NaN));
-		testObjectSerialization(new Float(Float.NEGATIVE_INFINITY));
-		testObjectSerialization(new Float(Float.POSITIVE_INFINITY));
-		
-		testObjectSerialization(new Double(0));
-		testObjectSerialization(new Double(1));
-		testObjectSerialization(new Double(-1));
-		testObjectSerialization(new Double(Math.E));
-		testObjectSerialization(new Double(Math.PI));
-		testObjectSerialization(new Double(Double.MIN_VALUE));
-		testObjectSerialization(new Double(Double.MAX_VALUE));
-		testObjectSerialization(new Double(Double.MIN_NORMAL));
-		testObjectSerialization(new Double(Double.NaN));
-		testObjectSerialization(new Double(Double.NEGATIVE_INFINITY));
-		testObjectSerialization(new Double(Double.POSITIVE_INFINITY));
-		
-		testObjectSerialization("");
-		testObjectSerialization("abcdefg");
-		testObjectSerialization("ab\u1535\u0155xyz\u706F");
-		
-		testObjectSerialization(new SimpleTypes(3637, 54876486548L, (byte) 65, "We're out looking for astronauts", (short) 0x2387, 2.65767523));
-		testObjectSerialization(new SimpleTypes(705608724, -1L, (byte) -65, "Serve me the sky with a big slice of lemon", (short) Byte.MIN_VALUE, 0.0000001));
-	}
-	
-	@Test
-	public void testArrayTypes() {
-		{
-			int[] array = new int[] {1, 2, 3, 4, 5};
-			testObjectSerialization(array);
-		}
-		{
-			long[] array = new long[] {1, 2, 3, 4, 5};
-			testObjectSerialization(array);
-		}
-		{
-			float[] array = new float[] {1, 2, 3, 4, 5};
-			testObjectSerialization(array);
-		}
-		{
-			double[] array = new double[] {1, 2, 3, 4, 5};
-			testObjectSerialization(array);
-		}
-		{
-			String[] array = new String[] {"Oh", "my", "what", "do", "we", "have", "here", "?"};
-			testObjectSerialization(array);
-		}
-	}
-	
-	@Test
-	public void testEmptyArray() {
-		{
-			int[] array = new int[0];
-			testObjectSerialization(array);
-		}
-		{
-			long[] array = new long[0];
-			testObjectSerialization(array);
-		}
-		{
-			float[] array = new float[0];
-			testObjectSerialization(array);
-		}
-		{
-			double[] array = new double[0];
-			testObjectSerialization(array);
-		}
-		{
-			String[] array = new String[0];
-			testObjectSerialization(array);
-		}
-	}
-	
-	@Test
-	public void testObjects() {
-		// simple object containing only primitives
-		{
-			testObjectSerialization(new Book(976243875L, "The Serialization Odysse", 42));
-		}
-		
-		// object with collection
-		{
-			ArrayList<String> list = new ArrayList<String>();
-			list.add("A");
-			list.add("B");
-			list.add("C");
-			list.add("D");
-			list.add("E");
-			
-			testObjectSerialization(new BookAuthor(976243875L, list, "Arno Nym"));
-		}
-		
-		// object with empty collection
-		{
-			ArrayList<String> list = new ArrayList<String>();
-			testObjectSerialization(new BookAuthor(987654321L, list, "The Saurus"));
-		}
-	}
-	
-	@Test
-	public void testNestedObjectsWithCollections() {
-		testObjectSerialization(new ComplexNestedObject2(true));
-	}
-	
-	@Test
-	public void testGeneratedObjectWithNullableFields() {
-		List<CharSequence> strings = Arrays.asList(new CharSequence[] { "These", "strings", "should", "be", "recognizable", "as", "a", "meaningful", "sequence" });
-		List<Boolean> bools = Arrays.asList(true, true, false, false, true, false, true, true);
-		Map<CharSequence, Long> map = new HashMap<CharSequence, Long>();
-		map.put("1", 1L);
-		map.put("2", 2L);
-		map.put("3", 3L);
-		
-		User user = new User("Freudenreich", 1337, "macintosh gray", 1234567890L, 3.1415926, null, true, strings, bools, null, Colors.GREEN, map);
-		
-		testObjectSerialization(user);
-	}
-	
-	@Test
-	public void testVarLenCountEncoding() {
-		try {
-			long[] values = new long[] { 0, 1, 2, 3, 4, 0, 574, 45236, 0, 234623462, 23462462346L, 0, 9734028767869761L, 0x7fffffffffffffffL};
-			
-			// write
-			ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
-			{
-				DataOutputStream dataOut = new DataOutputStream(baos);
-				
-				for (long val : values) {
-					DataOutputEncoder.writeVarLongCount(dataOut, val);
-				}
-				
-				dataOut.flush();
-				dataOut.close();
-			}
-			
-			// read
-			{
-				ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
-				DataInputStream dataIn = new DataInputStream(bais);
-				
-				for (long val : values) {
-					long read = DataInputDecoder.readVarLongCount(dataIn);
-					assertEquals("Wrong var-len encoded value read.", val, read);
-				}
-			}
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Test failed due to an exception: " + e.getMessage());
-		}
-	}
-	
-	private static <X> void testObjectSerialization(X obj) {
-		
-		try {
-			
-			// serialize
-			ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
-			{
-				DataOutputStream dataOut = new DataOutputStream(baos);
-				DataOutputEncoder encoder = new DataOutputEncoder();
-				encoder.setOut(dataOut);
-				
-				@SuppressWarnings("unchecked")
-				Class<X> clazz = (Class<X>) obj.getClass();
-				ReflectDatumWriter<X> writer = new ReflectDatumWriter<X>(clazz);
-				
-				writer.write(obj, encoder);
-				dataOut.flush();
-				dataOut.close();
-			}
-			
-			byte[] data = baos.toByteArray();
-			X result = null;
-			
-			// deserialize
-			{
-				ByteArrayInputStream bais = new ByteArrayInputStream(data);
-				DataInputStream dataIn = new DataInputStream(bais);
-				DataInputDecoder decoder = new DataInputDecoder();
-				decoder.setIn(dataIn);
-
-				@SuppressWarnings("unchecked")
-				Class<X> clazz = (Class<X>) obj.getClass();
-				ReflectDatumReader<X> reader = new ReflectDatumReader<X>(clazz);
-				
-				// create a reuse object if possible, otherwise we have no reuse object 
-				X reuse = null;
-				try {
-					@SuppressWarnings("unchecked")
-					X test = (X) obj.getClass().newInstance();
-					reuse = test;
-				} catch (Throwable t) {}
-				
-				result = reader.read(reuse, decoder);
-			}
-			
-			// check
-			final String message = "Deserialized object is not the same as the original";
-			
-			if (obj.getClass().isArray()) {
-				Class<?> clazz = obj.getClass();
-				if (clazz == byte[].class) {
-					assertArrayEquals(message, (byte[]) obj, (byte[]) result);
-				}
-				else if (clazz == short[].class) {
-					assertArrayEquals(message, (short[]) obj, (short[]) result);
-				}
-				else if (clazz == int[].class) {
-					assertArrayEquals(message, (int[]) obj, (int[]) result);
-				}
-				else if (clazz == long[].class) {
-					assertArrayEquals(message, (long[]) obj, (long[]) result);
-				}
-				else if (clazz == char[].class) {
-					assertArrayEquals(message, (char[]) obj, (char[]) result);
-				}
-				else if (clazz == float[].class) {
-					assertArrayEquals(message, (float[]) obj, (float[]) result, 0.0f);
-				}
-				else if (clazz == double[].class) {
-					assertArrayEquals(message, (double[]) obj, (double[]) result, 0.0);
-				} else {
-					assertArrayEquals(message, (Object[]) obj, (Object[]) result);
-				}
-			} else {
-				assertEquals(message, obj, result);
-			}
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Test failed due to an exception: " + e.getMessage());
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Test Objects
-	// --------------------------------------------------------------------------------------------
-
-
-	public static final class SimpleTypes {
-		
-		private final int iVal;
-		private final long lVal;
-		private final byte bVal;
-		private final String sVal;
-		private final short rVal;
-		private final double dVal;
-		
-		
-		public SimpleTypes() {
-			this(0, 0, (byte) 0, "", (short) 0, 0);
-		}
-		
-		public SimpleTypes(int iVal, long lVal, byte bVal, String sVal, short rVal, double dVal) {
-			this.iVal = iVal;
-			this.lVal = lVal;
-			this.bVal = bVal;
-			this.sVal = sVal;
-			this.rVal = rVal;
-			this.dVal = dVal;
-		}
-		
-		@Override
-		public boolean equals(Object obj) {
-			if (obj.getClass() == SimpleTypes.class) {
-				SimpleTypes other = (SimpleTypes) obj;
-				
-				return other.iVal == this.iVal &&
-						other.lVal == this.lVal &&
-						other.bVal == this.bVal &&
-						other.sVal.equals(this.sVal) &&
-						other.rVal == this.rVal &&
-						other.dVal == this.dVal;
-				
-			} else {
-				return false;
-			}
-		}
-	}
-	
-	public static class ComplexNestedObject1 {
-		
-		private double doubleValue;
-		
-		private List<String> stringList;
-		
-		public ComplexNestedObject1() {}
-		
-		public ComplexNestedObject1(int offInit) {
-			this.doubleValue = 6293485.6723 + offInit;
-				
-			this.stringList = new ArrayList<String>();
-			this.stringList.add("A" + offInit);
-			this.stringList.add("somewhat" + offInit);
-			this.stringList.add("random" + offInit);
-			this.stringList.add("collection" + offInit);
-			this.stringList.add("of" + offInit);
-			this.stringList.add("strings" + offInit);
-		}
-		
-		@Override
-		public boolean equals(Object obj) {
-			if (obj.getClass() == ComplexNestedObject1.class) {
-				ComplexNestedObject1 other = (ComplexNestedObject1) obj;
-				return other.doubleValue == this.doubleValue && this.stringList.equals(other.stringList);
-			} else {
-				return false;
-			}
-		}
-	}
-	
-	public static class ComplexNestedObject2 {
-		
-		private long longValue;
-		
-		private Map<String, ComplexNestedObject1> theMap;
-		
-		public ComplexNestedObject2() {}
-		
-		public ComplexNestedObject2(boolean init) {
-			this.longValue = 46547;
-				
-			this.theMap = new HashMap<String, ComplexNestedObject1>();
-			this.theMap.put("36354L", new ComplexNestedObject1(43546543));
-			this.theMap.put("785611L", new ComplexNestedObject1(45784568));
-			this.theMap.put("43L", new ComplexNestedObject1(9876543));
-			this.theMap.put("-45687L", new ComplexNestedObject1(7897615));
-			this.theMap.put("1919876876896L", new ComplexNestedObject1(27154));
-			this.theMap.put("-868468468L", new ComplexNestedObject1(546435));
-		}
-		
-		@Override
-		public boolean equals(Object obj) {
-			if (obj.getClass() == ComplexNestedObject2.class) {
-				ComplexNestedObject2 other = (ComplexNestedObject2) obj;
-				return other.longValue == this.longValue && this.theMap.equals(other.theMap);
-			} else {
-				return false;
-			}
-		}
-	}
-	
-	public static class Book {
-
-		private long bookId;
-		private String title;
-		private long authorId;
-
-		public Book() {}
-
-		public Book(long bookId, String title, long authorId) {
-			this.bookId = bookId;
-			this.title = title;
-			this.authorId = authorId;
-		}
-		
-		@Override
-		public boolean equals(Object obj) {
-			if (obj.getClass() == Book.class) {
-				Book other = (Book) obj;
-				return other.bookId == this.bookId && other.authorId == this.authorId && this.title.equals(other.title);
-			} else {
-				return false;
-			}
-		}
-	}
-
-	public static class BookAuthor {
-
-		private long authorId;
-		private List<String> bookTitles;
-		private String authorName;
-
-		public BookAuthor() {}
-
-		public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
-			this.authorId = authorId;
-			this.bookTitles = bookTitles;
-			this.authorName = authorName;
-		}
-		
-		@Override
-		public boolean equals(Object obj) {
-			if (obj.getClass() == BookAuthor.class) {
-				BookAuthor other = (BookAuthor) obj;
-				return other.authorName.equals(this.authorName) && other.authorId == this.authorId &&
-						other.bookTitles.equals(this.bookTitles);
-			} else {
-				return false;
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java b/flink-addons/avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
deleted file mode 100644
index 146c72b..0000000
--- a/flink-addons/avro/src/test/java/org/apache/flink/api/avro/testjar/AvroExternalJarProgram.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.avro.testjar;
-
-// ================================================================================================
-//  This file defines the classes for the AvroExternalJarProgramITCase.
-//  The program is exported into src/test/resources/AvroTestProgram.jar.
-//
-//  THIS FILE MUST STAY FULLY COMMENTED SUCH THAT THE HERE DEFINED CLASSES ARE NOT COMPILED
-//  AND ADDED TO THE test-classes DIRECTORY. OTHERWISE, THE EXTERNAL CLASS LOADING WILL
-//  NOT BE COVERED BY THIS TEST.
-// ================================================================================================
-
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.flink.api.avro.AvroBaseValue;
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.AvroInputFormat;
-import org.apache.flink.api.java.io.DiscardingOuputFormat;
-import org.apache.flink.core.fs.Path;
-
-public class AvroExternalJarProgram  {
-
-	public static final class Color {
-		
-		private String name;
-		private double saturation;
-		
-		public Color() {
-			name = "";
-			saturation = 1.0;
-		}
-		
-		public Color(String name, double saturation) {
-			this.name = name;
-			this.saturation = saturation;
-		}
-		
-		public String getName() {
-			return name;
-		}
-		
-		public void setName(String name) {
-			this.name = name;
-		}
-		
-		public double getSaturation() {
-			return saturation;
-		}
-		
-		public void setSaturation(double saturation) {
-			this.saturation = saturation;
-		}
-		
-		@Override
-		public String toString() {
-			return name + '(' + saturation + ')';
-		}
-	}
-	
-	public static final class MyUser {
-		
-		private String name;
-		private List<Color> colors;
-		
-		public MyUser() {
-			name = "unknown";
-			colors = new ArrayList<Color>();
-		}
-		
-		public MyUser(String name, List<Color> colors) {
-			this.name = name;
-			this.colors = colors;
-		}
-		
-		public String getName() {
-			return name;
-		}
-		
-		public List<Color> getColors() {
-			return colors;
-		}
-		
-		public void setName(String name) {
-			this.name = name;
-		}
-		
-		public void setColors(List<Color> colors) {
-			this.colors = colors;
-		}
-		
-		@Override
-		public String toString() {
-			return name + " : " + colors;
-		}
-	}
-	
-	
-	public static final class SUser extends AvroBaseValue<MyUser> {
-		
-		static final long serialVersionUID = 1L;
-
-		public SUser() {}
-	
-		public SUser(MyUser u) {
-			super(u);
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public static final class NameExtractor extends MapFunction<MyUser, Tuple2<String, MyUser>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple2<String, MyUser> map(MyUser u) {
-			String namePrefix = u.getName().substring(0, 1);
-			return new Tuple2<String, MyUser>(namePrefix, u);
-		}
-	}
-	
-	public static final class NameGrouper extends ReduceFunction<Tuple2<String, MyUser>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple2<String, MyUser> reduce(Tuple2<String, MyUser> val1, Tuple2<String, MyUser> val2) {
-			return val1;
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Test Data
-	// --------------------------------------------------------------------------------------------
-	
-	public static final class Generator {
-		
-		private final Random rnd = new Random(2389756789345689276L);
-		
-		public MyUser nextUser() {
-			return randomUser();
-		}
-		
-		private MyUser randomUser() {
-			
-			int numColors = rnd.nextInt(5);
-			ArrayList<Color> colors = new ArrayList<Color>(numColors);
-			for (int i = 0; i < numColors; i++) {
-				colors.add(new Color(randomString(), rnd.nextDouble()));
-			}
-			
-			return new MyUser(randomString(), colors);
-		}
-		
-		private String randomString() {
-			char[] c = new char[this.rnd.nextInt(20) + 5];
-			
-			for (int i = 0; i < c.length; i++) {
-				c[i] = (char) (this.rnd.nextInt(150) + 40);
-			}
-			
-			return new String(c);
-		}
-	}
-	
-	public static void writeTestData(File testFile, int numRecords) throws IOException {
-		
-		DatumWriter<MyUser> userDatumWriter = new ReflectDatumWriter<MyUser>(MyUser.class);
-		DataFileWriter<MyUser> dataFileWriter = new DataFileWriter<MyUser>(userDatumWriter);
-		
-		dataFileWriter.create(ReflectData.get().getSchema(MyUser.class), testFile);
-		
-		
-		Generator generator = new Generator();
-		
-		for (int i = 0; i < numRecords; i++) {
-			MyUser user = generator.nextUser();
-			dataFileWriter.append(user);
-		}
-		
-		dataFileWriter.close();
-	}
-
-//	public static void main(String[] args) throws Exception {
-//		String testDataFile = new File("src/test/resources/testdata.avro").getAbsolutePath();
-//		writeTestData(new File(testDataFile), 50);
-//	}
-	
-	public static void main(String[] args) throws Exception {
-		String inputPath = args[0];
-		
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		DataSet<MyUser> input = env.createInput(new AvroInputFormat<MyUser>(new Path(inputPath), MyUser.class));
-	
-		DataSet<Tuple2<String, MyUser>> result = input.map(new NameExtractor()).groupBy(0).reduce(new NameGrouper());
-		
-		result.output(new DiscardingOuputFormat<Tuple2<String,MyUser>>());
-		env.execute();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java b/flink-addons/avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
deleted file mode 100644
index aa08006..0000000
--- a/flink-addons/avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.io;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.AvroInputFormat;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.TypeInformation;
-
-public class AvroInputFormatTypeExtractionTest {
-
-	@Test
-	public void testTypeExtraction() {
-		try {
-			InputFormat<MyAvroType, ?> format = new AvroInputFormat<MyAvroType>(new Path("file:///ignore/this/file"), MyAvroType.class);
-			
-			TypeInformation<?> typeInfoDirect = TypeExtractor.getInputFormatTypes(format);
-			
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			DataSet<MyAvroType> input = env.createInput(format);
-			TypeInformation<?> typeInfoDataSet = input.getType();
-			
-			
-			Assert.assertTrue(typeInfoDirect instanceof PojoTypeInfo);
-			Assert.assertTrue(typeInfoDataSet instanceof PojoTypeInfo);
-			
-			Assert.assertEquals(MyAvroType.class, typeInfoDirect.getTypeClass());
-			Assert.assertEquals(MyAvroType.class, typeInfoDataSet.getTypeClass());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-	
-	public static final class MyAvroType {
-		
-		public String theString;
-		
-		private double aDouble;
-		
-		public double getaDouble() {
-			return aDouble;
-		}
-		
-		public void setaDouble(double aDouble) {
-			this.aDouble = aDouble;
-		}
-		
-		public void setTheString(String theString) {
-			this.theString = theString;
-		}
-		
-		public String getTheString() {
-			return theString;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java b/flink-addons/avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java
deleted file mode 100644
index 2387fd6..0000000
--- a/flink-addons/avro/src/test/java/org/apache/flink/api/java/record/io/avro/AvroRecordInputFormatTest.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io.avro;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import junit.framework.Assert;
-
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat;
-import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat.BooleanListValue;
-import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat.LongMapValue;
-import org.apache.flink.api.java.record.io.avro.AvroRecordInputFormat.StringListValue;
-import org.apache.flink.api.java.record.io.avro.generated.Colors;
-import org.apache.flink.api.java.record.io.avro.generated.User;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-
-/**
- * Test the avro input format.
- * (The testcase is mostly the getting started tutorial of avro)
- * http://avro.apache.org/docs/current/gettingstartedjava.html
- */
-public class AvroRecordInputFormatTest {
-	
-	private File testFile;
-	
-	private final AvroRecordInputFormat format = new AvroRecordInputFormat();
-	final static String TEST_NAME = "Alyssa";
-	
-	final static String TEST_ARRAY_STRING_1 = "ELEMENT 1";
-	final static String TEST_ARRAY_STRING_2 = "ELEMENT 2";
-	
-	final static boolean TEST_ARRAY_BOOLEAN_1 = true;
-	final static boolean TEST_ARRAY_BOOLEAN_2 = false;
-	
-	final static Colors TEST_ENUM_COLOR = Colors.GREEN;
-	
-	final static CharSequence TEST_MAP_KEY1 = "KEY 1";
-	final static long TEST_MAP_VALUE1 = 8546456L;
-	final static CharSequence TEST_MAP_KEY2 = "KEY 2";
-	final static long TEST_MAP_VALUE2 = 17554L;
-	
-	
-	@Before
-	public void createFiles() throws IOException {
-		testFile = File.createTempFile("AvroInputFormatTest", null);
-		
-		ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>();
-		stringArray.add(TEST_ARRAY_STRING_1);
-		stringArray.add(TEST_ARRAY_STRING_2);
-		
-		ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
-		booleanArray.add(TEST_ARRAY_BOOLEAN_1);
-		booleanArray.add(TEST_ARRAY_BOOLEAN_2);
-		
-		HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>();
-		longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
-		longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
-		
-		
-		User user1 = new User();
-		user1.setName(TEST_NAME);
-		user1.setFavoriteNumber(256);
-		user1.setTypeDoubleTest(123.45d);
-		user1.setTypeBoolTest(true);
-		user1.setTypeArrayString(stringArray);
-		user1.setTypeArrayBoolean(booleanArray);
-		user1.setTypeEnum(TEST_ENUM_COLOR);
-		user1.setTypeMap(longMap);
-	     
-		// Construct via builder
-		User user2 = User.newBuilder()
-		             .setName("Charlie")
-		             .setFavoriteColor("blue")
-		             .setFavoriteNumber(null)
-		             .setTypeBoolTest(false)
-		             .setTypeDoubleTest(1.337d)
-		             .setTypeNullTest(null)
-		             .setTypeLongTest(1337L)
-		             .setTypeArrayString(new ArrayList<CharSequence>())
-		             .setTypeArrayBoolean(new ArrayList<Boolean>())
-		             .setTypeNullableArray(null)
-		             .setTypeEnum(Colors.RED)
-		             .setTypeMap(new HashMap<CharSequence, Long>())
-		             .build();
-		DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
-		DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
-		dataFileWriter.create(user1.getSchema(), testFile);
-		dataFileWriter.append(user1);
-		dataFileWriter.append(user2);
-		dataFileWriter.close();
-	}
-	
-	@Test
-	public void testDeserialisation() throws IOException {
-		Configuration parameters = new Configuration();
-		format.setFilePath(testFile.toURI().toString());
-		format.configure(parameters);
-		FileInputSplit[] splits = format.createInputSplits(1);
-		Assert.assertEquals(splits.length, 1);
-		format.open(splits[0]);
-		Record record = new Record();
-		Assert.assertNotNull(format.nextRecord(record));
-		StringValue name = record.getField(0, StringValue.class);
-		Assert.assertNotNull("empty record", name);
-		Assert.assertEquals("name not equal",name.getValue(), TEST_NAME);
-		
-		// check arrays
-		StringListValue sl = record.getField(7, AvroRecordInputFormat.StringListValue.class);
-		Assert.assertEquals("element 0 not equal", sl.get(0).getValue(), TEST_ARRAY_STRING_1);
-		Assert.assertEquals("element 1 not equal", sl.get(1).getValue(), TEST_ARRAY_STRING_2);
-		
-		BooleanListValue bl = record.getField(8, AvroRecordInputFormat.BooleanListValue.class);
-		Assert.assertEquals("element 0 not equal", bl.get(0).getValue(), TEST_ARRAY_BOOLEAN_1);
-		Assert.assertEquals("element 1 not equal", bl.get(1).getValue(), TEST_ARRAY_BOOLEAN_2);
-		
-		// check enums
-		StringValue enumValue = record.getField(10, StringValue.class);
-		Assert.assertEquals("string representation of enum not equal", enumValue.getValue(), TEST_ENUM_COLOR.toString()); 
-		
-		// check maps
-		LongMapValue lm = record.getField(11, AvroRecordInputFormat.LongMapValue.class);
-		Assert.assertEquals("map value of key 1 not equal", lm.get(new StringValue(TEST_MAP_KEY1)).getValue(), TEST_MAP_VALUE1);
-		Assert.assertEquals("map value of key 2 not equal", lm.get(new StringValue(TEST_MAP_KEY2)).getValue(), TEST_MAP_VALUE2);
-		
-		
-		Assert.assertFalse("expecting second element", format.reachedEnd());
-		Assert.assertNotNull("expecting second element", format.nextRecord(record));
-		
-		Assert.assertNull(format.nextRecord(record));
-		Assert.assertTrue(format.reachedEnd());
-		
-		format.close();
-	}
-	
-	@After
-	public void deleteFiles() {
-		testFile.delete();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/Colors.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/Colors.java b/flink-addons/avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/Colors.java
deleted file mode 100644
index 4cacb7f..0000000
--- a/flink-addons/avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/Colors.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * Autogenerated by Avro
- * 
- * DO NOT EDIT DIRECTLY
- */
-package org.apache.flink.api.java.record.io.avro.generated;  
-@SuppressWarnings("all")
-@org.apache.avro.specific.AvroGenerated
-public enum Colors { 
-  RED, GREEN, BLUE  ;
-  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"enum\",\"name\":\"Colors\",\"namespace\":\"org.apache.flink.api.java.record.io.avro.generated\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}");
-  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/User.java
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/User.java b/flink-addons/avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/User.java
deleted file mode 100644
index 61bbe41..0000000
--- a/flink-addons/avro/src/test/java/org/apache/flink/api/java/record/io/avro/generated/User.java
+++ /dev/null
@@ -1,755 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * Autogenerated by Avro
- * 
- * DO NOT EDIT DIRECTLY
- */
-package org.apache.flink.api.java.record.io.avro.generated;  
-@SuppressWarnings("all")
-@org.apache.avro.specific.AvroGenerated
-public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
-  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.java.record.io.avro.generated\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]},{\"name\":\"type_long_test\",\"type\":[\"long\",\"null\"]},{\"name\":\"type_double_test\",\"type\":[\"double\"]},{\"name\":\"type_null_test\",\"type\":[\"null\"]},{\"name\":\"type_bool_test\",\"type\":[\"boolean\"]},{\"name\":\"type_array_string\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"type_array_boolean\",\"type\":{\"type\":\"array\",\"items\":\"boolean\"}},{\"name\":\"type_nullable_array\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"default\":null},{\"name\":\"type_enum\",\"type\":{\"type\":\"enum\",\"name\":\"Colors\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}},{\"n
 ame\":\"type_map\",\"type\":{\"type\":\"map\",\"values\":\"long\"}}]}");
-  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
-  @Deprecated public java.lang.CharSequence name;
-  @Deprecated public java.lang.Integer favorite_number;
-  @Deprecated public java.lang.CharSequence favorite_color;
-  @Deprecated public java.lang.Long type_long_test;
-  @Deprecated public java.lang.Object type_double_test;
-  @Deprecated public java.lang.Object type_null_test;
-  @Deprecated public java.lang.Object type_bool_test;
-  @Deprecated public java.util.List<java.lang.CharSequence> type_array_string;
-  @Deprecated public java.util.List<java.lang.Boolean> type_array_boolean;
-  @Deprecated public java.util.List<java.lang.CharSequence> type_nullable_array;
-  @Deprecated public org.apache.flink.api.java.record.io.avro.generated.Colors type_enum;
-  @Deprecated public java.util.Map<java.lang.CharSequence,java.lang.Long> type_map;
-
-  /**
-   * Default constructor.  Note that this does not initialize fields
-   * to their default values from the schema.  If that is desired then
-   * one should use {@link \#newBuilder()}. 
-   */
-  public User() {}
-
-  /**
-   * All-args constructor.
-   */
-  public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color, java.lang.Long type_long_test, java.lang.Object type_double_test, java.lang.Object type_null_test, java.lang.Object type_bool_test, java.util.List<java.lang.CharSequence> type_array_string, java.util.List<java.lang.Boolean> type_array_boolean, java.util.List<java.lang.CharSequence> type_nullable_array, org.apache.flink.api.java.record.io.avro.generated.Colors type_enum, java.util.Map<java.lang.CharSequence,java.lang.Long> type_map) {
-    this.name = name;
-    this.favorite_number = favorite_number;
-    this.favorite_color = favorite_color;
-    this.type_long_test = type_long_test;
-    this.type_double_test = type_double_test;
-    this.type_null_test = type_null_test;
-    this.type_bool_test = type_bool_test;
-    this.type_array_string = type_array_string;
-    this.type_array_boolean = type_array_boolean;
-    this.type_nullable_array = type_nullable_array;
-    this.type_enum = type_enum;
-    this.type_map = type_map;
-  }
-
-  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
-  // Used by DatumWriter.  Applications should not call. 
-  public java.lang.Object get(int field$) {
-    switch (field$) {
-    case 0: return name;
-    case 1: return favorite_number;
-    case 2: return favorite_color;
-    case 3: return type_long_test;
-    case 4: return type_double_test;
-    case 5: return type_null_test;
-    case 6: return type_bool_test;
-    case 7: return type_array_string;
-    case 8: return type_array_boolean;
-    case 9: return type_nullable_array;
-    case 10: return type_enum;
-    case 11: return type_map;
-    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
-    }
-  }
-  // Used by DatumReader.  Applications should not call. 
-  @SuppressWarnings(value="unchecked")
-  public void put(int field$, java.lang.Object value$) {
-    switch (field$) {
-    case 0: name = (java.lang.CharSequence)value$; break;
-    case 1: favorite_number = (java.lang.Integer)value$; break;
-    case 2: favorite_color = (java.lang.CharSequence)value$; break;
-    case 3: type_long_test = (java.lang.Long)value$; break;
-    case 4: type_double_test = (java.lang.Object)value$; break;
-    case 5: type_null_test = (java.lang.Object)value$; break;
-    case 6: type_bool_test = (java.lang.Object)value$; break;
-    case 7: type_array_string = (java.util.List<java.lang.CharSequence>)value$; break;
-    case 8: type_array_boolean = (java.util.List<java.lang.Boolean>)value$; break;
-    case 9: type_nullable_array = (java.util.List<java.lang.CharSequence>)value$; break;
-    case 10: type_enum = (org.apache.flink.api.java.record.io.avro.generated.Colors)value$; break;
-    case 11: type_map = (java.util.Map<java.lang.CharSequence,java.lang.Long>)value$; break;
-    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
-    }
-  }
-
-  /**
-   * Gets the value of the 'name' field.
-   */
-  public java.lang.CharSequence getName() {
-    return name;
-  }
-
-  /**
-   * Sets the value of the 'name' field.
-   * @param value the value to set.
-   */
-  public void setName(java.lang.CharSequence value) {
-    this.name = value;
-  }
-
-  /**
-   * Gets the value of the 'favorite_number' field.
-   */
-  public java.lang.Integer getFavoriteNumber() {
-    return favorite_number;
-  }
-
-  /**
-   * Sets the value of the 'favorite_number' field.
-   * @param value the value to set.
-   */
-  public void setFavoriteNumber(java.lang.Integer value) {
-    this.favorite_number = value;
-  }
-
-  /**
-   * Gets the value of the 'favorite_color' field.
-   */
-  public java.lang.CharSequence getFavoriteColor() {
-    return favorite_color;
-  }
-
-  /**
-   * Sets the value of the 'favorite_color' field.
-   * @param value the value to set.
-   */
-  public void setFavoriteColor(java.lang.CharSequence value) {
-    this.favorite_color = value;
-  }
-
-  /**
-   * Gets the value of the 'type_long_test' field.
-   */
-  public java.lang.Long getTypeLongTest() {
-    return type_long_test;
-  }
-
-  /**
-   * Sets the value of the 'type_long_test' field.
-   * @param value the value to set.
-   */
-  public void setTypeLongTest(java.lang.Long value) {
-    this.type_long_test = value;
-  }
-
-  /**
-   * Gets the value of the 'type_double_test' field.
-   */
-  public java.lang.Object getTypeDoubleTest() {
-    return type_double_test;
-  }
-
-  /**
-   * Sets the value of the 'type_double_test' field.
-   * @param value the value to set.
-   */
-  public void setTypeDoubleTest(java.lang.Object value) {
-    this.type_double_test = value;
-  }
-
-  /**
-   * Gets the value of the 'type_null_test' field.
-   */
-  public java.lang.Object getTypeNullTest() {
-    return type_null_test;
-  }
-
-  /**
-   * Sets the value of the 'type_null_test' field.
-   * @param value the value to set.
-   */
-  public void setTypeNullTest(java.lang.Object value) {
-    this.type_null_test = value;
-  }
-
-  /**
-   * Gets the value of the 'type_bool_test' field.
-   */
-  public java.lang.Object getTypeBoolTest() {
-    return type_bool_test;
-  }
-
-  /**
-   * Sets the value of the 'type_bool_test' field.
-   * @param value the value to set.
-   */
-  public void setTypeBoolTest(java.lang.Object value) {
-    this.type_bool_test = value;
-  }
-
-  /**
-   * Gets the value of the 'type_array_string' field.
-   */
-  public java.util.List<java.lang.CharSequence> getTypeArrayString() {
-    return type_array_string;
-  }
-
-  /**
-   * Sets the value of the 'type_array_string' field.
-   * @param value the value to set.
-   */
-  public void setTypeArrayString(java.util.List<java.lang.CharSequence> value) {
-    this.type_array_string = value;
-  }
-
-  /**
-   * Gets the value of the 'type_array_boolean' field.
-   */
-  public java.util.List<java.lang.Boolean> getTypeArrayBoolean() {
-    return type_array_boolean;
-  }
-
-  /**
-   * Sets the value of the 'type_array_boolean' field.
-   * @param value the value to set.
-   */
-  public void setTypeArrayBoolean(java.util.List<java.lang.Boolean> value) {
-    this.type_array_boolean = value;
-  }
-
-  /**
-   * Gets the value of the 'type_nullable_array' field.
-   */
-  public java.util.List<java.lang.CharSequence> getTypeNullableArray() {
-    return type_nullable_array;
-  }
-
-  /**
-   * Sets the value of the 'type_nullable_array' field.
-   * @param value the value to set.
-   */
-  public void setTypeNullableArray(java.util.List<java.lang.CharSequence> value) {
-    this.type_nullable_array = value;
-  }
-
-  /**
-   * Gets the value of the 'type_enum' field.
-   */
-  public org.apache.flink.api.java.record.io.avro.generated.Colors getTypeEnum() {
-    return type_enum;
-  }
-
-  /**
-   * Sets the value of the 'type_enum' field.
-   * @param value the value to set.
-   */
-  public void setTypeEnum(org.apache.flink.api.java.record.io.avro.generated.Colors value) {
-    this.type_enum = value;
-  }
-
-  /**
-   * Gets the value of the 'type_map' field.
-   */
-  public java.util.Map<java.lang.CharSequence,java.lang.Long> getTypeMap() {
-    return type_map;
-  }
-
-  /**
-   * Sets the value of the 'type_map' field.
-   * @param value the value to set.
-   */
-  public void setTypeMap(java.util.Map<java.lang.CharSequence,java.lang.Long> value) {
-    this.type_map = value;
-  }
-
-  /** Creates a new User RecordBuilder */
-  public static org.apache.flink.api.java.record.io.avro.generated.User.Builder newBuilder() {
-    return new org.apache.flink.api.java.record.io.avro.generated.User.Builder();
-  }
-  
-  /** Creates a new User RecordBuilder by copying an existing Builder */
-  public static org.apache.flink.api.java.record.io.avro.generated.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.generated.User.Builder other) {
-    return new org.apache.flink.api.java.record.io.avro.generated.User.Builder(other);
-  }
-  
-  /** Creates a new User RecordBuilder by copying an existing User instance */
-  public static org.apache.flink.api.java.record.io.avro.generated.User.Builder newBuilder(org.apache.flink.api.java.record.io.avro.generated.User other) {
-    return new org.apache.flink.api.java.record.io.avro.generated.User.Builder(other);
-  }
-  
-  /**
-   * RecordBuilder for User instances.
-   */
-  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User>
-    implements org.apache.avro.data.RecordBuilder<User> {
-
-    private java.lang.CharSequence name;
-    private java.lang.Integer favorite_number;
-    private java.lang.CharSequence favorite_color;
-    private java.lang.Long type_long_test;
-    private java.lang.Object type_double_test;
-    private java.lang.Object type_null_test;
-    private java.lang.Object type_bool_test;
-    private java.util.List<java.lang.CharSequence> type_array_string;
-    private java.util.List<java.lang.Boolean> type_array_boolean;
-    private java.util.List<java.lang.CharSequence> type_nullable_array;
-    private org.apache.flink.api.java.record.io.avro.generated.Colors type_enum;
-    private java.util.Map<java.lang.CharSequence,java.lang.Long> type_map;
-
-    /** Creates a new Builder */
-    private Builder() {
-      super(org.apache.flink.api.java.record.io.avro.generated.User.SCHEMA$);
-    }
-    
-    /** Creates a Builder by copying an existing Builder */
-    private Builder(org.apache.flink.api.java.record.io.avro.generated.User.Builder other) {
-      super(other);
-      if (isValidValue(fields()[0], other.name)) {
-        this.name = data().deepCopy(fields()[0].schema(), other.name);
-        fieldSetFlags()[0] = true;
-      }
-      if (isValidValue(fields()[1], other.favorite_number)) {
-        this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
-        fieldSetFlags()[1] = true;
-      }
-      if (isValidValue(fields()[2], other.favorite_color)) {
-        this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
-        fieldSetFlags()[2] = true;
-      }
-      if (isValidValue(fields()[3], other.type_long_test)) {
-        this.type_long_test = data().deepCopy(fields()[3].schema(), other.type_long_test);
-        fieldSetFlags()[3] = true;
-      }
-      if (isValidValue(fields()[4], other.type_double_test)) {
-        this.type_double_test = data().deepCopy(fields()[4].schema(), other.type_double_test);
-        fieldSetFlags()[4] = true;
-      }
-      if (isValidValue(fields()[5], other.type_null_test)) {
-        this.type_null_test = data().deepCopy(fields()[5].schema(), other.type_null_test);
-        fieldSetFlags()[5] = true;
-      }
-      if (isValidValue(fields()[6], other.type_bool_test)) {
-        this.type_bool_test = data().deepCopy(fields()[6].schema(), other.type_bool_test);
-        fieldSetFlags()[6] = true;
-      }
-      if (isValidValue(fields()[7], other.type_array_string)) {
-        this.type_array_string = data().deepCopy(fields()[7].schema(), other.type_array_string);
-        fieldSetFlags()[7] = true;
-      }
-      if (isValidValue(fields()[8], other.type_array_boolean)) {
-        this.type_array_boolean = data().deepCopy(fields()[8].schema(), other.type_array_boolean);
-        fieldSetFlags()[8] = true;
-      }
-      if (isValidValue(fields()[9], other.type_nullable_array)) {
-        this.type_nullable_array = data().deepCopy(fields()[9].schema(), other.type_nullable_array);
-        fieldSetFlags()[9] = true;
-      }
-      if (isValidValue(fields()[10], other.type_enum)) {
-        this.type_enum = data().deepCopy(fields()[10].schema(), other.type_enum);
-        fieldSetFlags()[10] = true;
-      }
-      if (isValidValue(fields()[11], other.type_map)) {
-        this.type_map = data().deepCopy(fields()[11].schema(), other.type_map);
-        fieldSetFlags()[11] = true;
-      }
-    }
-    
-    /** Creates a Builder by copying an existing User instance */
-    private Builder(org.apache.flink.api.java.record.io.avro.generated.User other) {
-            super(org.apache.flink.api.java.record.io.avro.generated.User.SCHEMA$);
-      if (isValidValue(fields()[0], other.name)) {
-        this.name = data().deepCopy(fields()[0].schema(), other.name);
-        fieldSetFlags()[0] = true;
-      }
-      if (isValidValue(fields()[1], other.favorite_number)) {
-        this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
-        fieldSetFlags()[1] = true;
-      }
-      if (isValidValue(fields()[2], other.favorite_color)) {
-        this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
-        fieldSetFlags()[2] = true;
-      }
-      if (isValidValue(fields()[3], other.type_long_test)) {
-        this.type_long_test = data().deepCopy(fields()[3].schema(), other.type_long_test);
-        fieldSetFlags()[3] = true;
-      }
-      if (isValidValue(fields()[4], other.type_double_test)) {
-        this.type_double_test = data().deepCopy(fields()[4].schema(), other.type_double_test);
-        fieldSetFlags()[4] = true;
-      }
-      if (isValidValue(fields()[5], other.type_null_test)) {
-        this.type_null_test = data().deepCopy(fields()[5].schema(), other.type_null_test);
-        fieldSetFlags()[5] = true;
-      }
-      if (isValidValue(fields()[6], other.type_bool_test)) {
-        this.type_bool_test = data().deepCopy(fields()[6].schema(), other.type_bool_test);
-        fieldSetFlags()[6] = true;
-      }
-      if (isValidValue(fields()[7], other.type_array_string)) {
-        this.type_array_string = data().deepCopy(fields()[7].schema(), other.type_array_string);
-        fieldSetFlags()[7] = true;
-      }
-      if (isValidValue(fields()[8], other.type_array_boolean)) {
-        this.type_array_boolean = data().deepCopy(fields()[8].schema(), other.type_array_boolean);
-        fieldSetFlags()[8] = true;
-      }
-      if (isValidValue(fields()[9], other.type_nullable_array)) {
-        this.type_nullable_array = data().deepCopy(fields()[9].schema(), other.type_nullable_array);
-        fieldSetFlags()[9] = true;
-      }
-      if (isValidValue(fields()[10], other.type_enum)) {
-        this.type_enum = data().deepCopy(fields()[10].schema(), other.type_enum);
-        fieldSetFlags()[10] = true;
-      }
-      if (isValidValue(fields()[11], other.type_map)) {
-        this.type_map = data().deepCopy(fields()[11].schema(), other.type_map);
-        fieldSetFlags()[11] = true;
-      }
-    }
-
-    /** Gets the value of the 'name' field */
-    public java.lang.CharSequence getName() {
-      return name;
-    }
-    
-    /** Sets the value of the 'name' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setName(java.lang.CharSequence value) {
-      validate(fields()[0], value);
-      this.name = value;
-      fieldSetFlags()[0] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'name' field has been set */
-    public boolean hasName() {
-      return fieldSetFlags()[0];
-    }
-    
-    /** Clears the value of the 'name' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearName() {
-      name = null;
-      fieldSetFlags()[0] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'favorite_number' field */
-    public java.lang.Integer getFavoriteNumber() {
-      return favorite_number;
-    }
-    
-    /** Sets the value of the 'favorite_number' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setFavoriteNumber(java.lang.Integer value) {
-      validate(fields()[1], value);
-      this.favorite_number = value;
-      fieldSetFlags()[1] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'favorite_number' field has been set */
-    public boolean hasFavoriteNumber() {
-      return fieldSetFlags()[1];
-    }
-    
-    /** Clears the value of the 'favorite_number' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearFavoriteNumber() {
-      favorite_number = null;
-      fieldSetFlags()[1] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'favorite_color' field */
-    public java.lang.CharSequence getFavoriteColor() {
-      return favorite_color;
-    }
-    
-    /** Sets the value of the 'favorite_color' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setFavoriteColor(java.lang.CharSequence value) {
-      validate(fields()[2], value);
-      this.favorite_color = value;
-      fieldSetFlags()[2] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'favorite_color' field has been set */
-    public boolean hasFavoriteColor() {
-      return fieldSetFlags()[2];
-    }
-    
-    /** Clears the value of the 'favorite_color' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearFavoriteColor() {
-      favorite_color = null;
-      fieldSetFlags()[2] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_long_test' field */
-    public java.lang.Long getTypeLongTest() {
-      return type_long_test;
-    }
-    
-    /** Sets the value of the 'type_long_test' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeLongTest(java.lang.Long value) {
-      validate(fields()[3], value);
-      this.type_long_test = value;
-      fieldSetFlags()[3] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_long_test' field has been set */
-    public boolean hasTypeLongTest() {
-      return fieldSetFlags()[3];
-    }
-    
-    /** Clears the value of the 'type_long_test' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeLongTest() {
-      type_long_test = null;
-      fieldSetFlags()[3] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_double_test' field */
-    public java.lang.Object getTypeDoubleTest() {
-      return type_double_test;
-    }
-    
-    /** Sets the value of the 'type_double_test' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeDoubleTest(java.lang.Object value) {
-      validate(fields()[4], value);
-      this.type_double_test = value;
-      fieldSetFlags()[4] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_double_test' field has been set */
-    public boolean hasTypeDoubleTest() {
-      return fieldSetFlags()[4];
-    }
-    
-    /** Clears the value of the 'type_double_test' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeDoubleTest() {
-      type_double_test = null;
-      fieldSetFlags()[4] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_null_test' field */
-    public java.lang.Object getTypeNullTest() {
-      return type_null_test;
-    }
-    
-    /** Sets the value of the 'type_null_test' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeNullTest(java.lang.Object value) {
-      validate(fields()[5], value);
-      this.type_null_test = value;
-      fieldSetFlags()[5] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_null_test' field has been set */
-    public boolean hasTypeNullTest() {
-      return fieldSetFlags()[5];
-    }
-    
-    /** Clears the value of the 'type_null_test' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeNullTest() {
-      type_null_test = null;
-      fieldSetFlags()[5] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_bool_test' field */
-    public java.lang.Object getTypeBoolTest() {
-      return type_bool_test;
-    }
-    
-    /** Sets the value of the 'type_bool_test' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeBoolTest(java.lang.Object value) {
-      validate(fields()[6], value);
-      this.type_bool_test = value;
-      fieldSetFlags()[6] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_bool_test' field has been set */
-    public boolean hasTypeBoolTest() {
-      return fieldSetFlags()[6];
-    }
-    
-    /** Clears the value of the 'type_bool_test' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeBoolTest() {
-      type_bool_test = null;
-      fieldSetFlags()[6] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_array_string' field */
-    public java.util.List<java.lang.CharSequence> getTypeArrayString() {
-      return type_array_string;
-    }
-    
-    /** Sets the value of the 'type_array_string' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeArrayString(java.util.List<java.lang.CharSequence> value) {
-      validate(fields()[7], value);
-      this.type_array_string = value;
-      fieldSetFlags()[7] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_array_string' field has been set */
-    public boolean hasTypeArrayString() {
-      return fieldSetFlags()[7];
-    }
-    
-    /** Clears the value of the 'type_array_string' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeArrayString() {
-      type_array_string = null;
-      fieldSetFlags()[7] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_array_boolean' field */
-    public java.util.List<java.lang.Boolean> getTypeArrayBoolean() {
-      return type_array_boolean;
-    }
-    
-    /** Sets the value of the 'type_array_boolean' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeArrayBoolean(java.util.List<java.lang.Boolean> value) {
-      validate(fields()[8], value);
-      this.type_array_boolean = value;
-      fieldSetFlags()[8] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_array_boolean' field has been set */
-    public boolean hasTypeArrayBoolean() {
-      return fieldSetFlags()[8];
-    }
-    
-    /** Clears the value of the 'type_array_boolean' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeArrayBoolean() {
-      type_array_boolean = null;
-      fieldSetFlags()[8] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_nullable_array' field */
-    public java.util.List<java.lang.CharSequence> getTypeNullableArray() {
-      return type_nullable_array;
-    }
-    
-    /** Sets the value of the 'type_nullable_array' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeNullableArray(java.util.List<java.lang.CharSequence> value) {
-      validate(fields()[9], value);
-      this.type_nullable_array = value;
-      fieldSetFlags()[9] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_nullable_array' field has been set */
-    public boolean hasTypeNullableArray() {
-      return fieldSetFlags()[9];
-    }
-    
-    /** Clears the value of the 'type_nullable_array' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeNullableArray() {
-      type_nullable_array = null;
-      fieldSetFlags()[9] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_enum' field */
-    public org.apache.flink.api.java.record.io.avro.generated.Colors getTypeEnum() {
-      return type_enum;
-    }
-    
-    /** Sets the value of the 'type_enum' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeEnum(org.apache.flink.api.java.record.io.avro.generated.Colors value) {
-      validate(fields()[10], value);
-      this.type_enum = value;
-      fieldSetFlags()[10] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_enum' field has been set */
-    public boolean hasTypeEnum() {
-      return fieldSetFlags()[10];
-    }
-    
-    /** Clears the value of the 'type_enum' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeEnum() {
-      type_enum = null;
-      fieldSetFlags()[10] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_map' field */
-    public java.util.Map<java.lang.CharSequence,java.lang.Long> getTypeMap() {
-      return type_map;
-    }
-    
-    /** Sets the value of the 'type_map' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder setTypeMap(java.util.Map<java.lang.CharSequence,java.lang.Long> value) {
-      validate(fields()[11], value);
-      this.type_map = value;
-      fieldSetFlags()[11] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_map' field has been set */
-    public boolean hasTypeMap() {
-      return fieldSetFlags()[11];
-    }
-    
-    /** Clears the value of the 'type_map' field */
-    public org.apache.flink.api.java.record.io.avro.generated.User.Builder clearTypeMap() {
-      type_map = null;
-      fieldSetFlags()[11] = false;
-      return this;
-    }
-
-    @Override
-    public User build() {
-      try {
-        User record = new User();
-        record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
-        record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]);
-        record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]);
-        record.type_long_test = fieldSetFlags()[3] ? this.type_long_test : (java.lang.Long) defaultValue(fields()[3]);
-        record.type_double_test = fieldSetFlags()[4] ? this.type_double_test : (java.lang.Object) defaultValue(fields()[4]);
-        record.type_null_test = fieldSetFlags()[5] ? this.type_null_test : (java.lang.Object) defaultValue(fields()[5]);
-        record.type_bool_test = fieldSetFlags()[6] ? this.type_bool_test : (java.lang.Object) defaultValue(fields()[6]);
-        record.type_array_string = fieldSetFlags()[7] ? this.type_array_string : (java.util.List<java.lang.CharSequence>) defaultValue(fields()[7]);
-        record.type_array_boolean = fieldSetFlags()[8] ? this.type_array_boolean : (java.util.List<java.lang.Boolean>) defaultValue(fields()[8]);
-        record.type_nullable_array = fieldSetFlags()[9] ? this.type_nullable_array : (java.util.List<java.lang.CharSequence>) defaultValue(fields()[9]);
-        record.type_enum = fieldSetFlags()[10] ? this.type_enum : (org.apache.flink.api.java.record.io.avro.generated.Colors) defaultValue(fields()[10]);
-        record.type_map = fieldSetFlags()[11] ? this.type_map : (java.util.Map<java.lang.CharSequence,java.lang.Long>) defaultValue(fields()[11]);
-        return record;
-      } catch (Exception e) {
-        throw new org.apache.avro.AvroRuntimeException(e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/test/resources/avro/user.avsc
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/test/resources/avro/user.avsc b/flink-addons/avro/src/test/resources/avro/user.avsc
deleted file mode 100644
index af3cb75..0000000
--- a/flink-addons/avro/src/test/resources/avro/user.avsc
+++ /dev/null
@@ -1,19 +0,0 @@
-
-{"namespace": "org.apache.flink.api.java.record.io.avro.generated",
- "type": "record",
- "name": "User",
- "fields": [
-     {"name": "name", "type": "string"},
-     {"name": "favorite_number",  "type": ["int", "null"]},
-     {"name": "favorite_color", "type": ["string", "null"]},
-     {"name": "type_long_test", "type": ["long", "null"]},
-     {"name": "type_double_test", "type": ["double"]},
-     {"name": "type_null_test", "type": ["null"]},
-     {"name": "type_bool_test", "type": ["boolean"]},
-     {"name": "type_array_string", "type" : {"type" : "array", "items" : "string"}},  
-     {"name": "type_array_boolean", "type" : {"type" : "array", "items" : "boolean"}}, 
-     {"name": "type_nullable_array", "type": ["null", {"type":"array", "items":"string"}], "default":null},
-     {"name": "type_enum", "type": {"type": "enum", "name": "Colors", "symbols" : ["RED", "GREEN", "BLUE"]}},
-     {"name": "type_map", "type": {"type": "map", "values": "long"}} 
- ]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/avro/src/test/resources/testdata.avro
----------------------------------------------------------------------
diff --git a/flink-addons/avro/src/test/resources/testdata.avro b/flink-addons/avro/src/test/resources/testdata.avro
deleted file mode 100644
index 45308b9..0000000
Binary files a/flink-addons/avro/src/test/resources/testdata.avro and /dev/null differ