You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/11/03 17:11:33 UTC

[10/21] flink git commit: [FLINK-7420] [avro] Move all Avro code to flink-avro

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
deleted file mode 100644
index be968c5..0000000
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.io.avro;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.io.avro.generated.User;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.io.AvroInputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Tests for the {@link AvroInputFormat} reading Pojos.
- */
-@RunWith(Parameterized.class)
-public class AvroPojoTest extends MultipleProgramsTestBase {
-	public AvroPojoTest(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	private File inFile;
-	private String resultPath;
-	private String expected;
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-		inFile = tempFolder.newFile();
-		AvroRecordInputFormatTest.writeTestFile(inFile);
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
-	@Test
-	public void testSimpleAvroRead() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Path in = new Path(inFile.getAbsoluteFile().toURI());
-
-		AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
-		DataSet<User> usersDS = env.createInput(users)
-				// null map type because the order changes in different JVMs (hard to test)
-		.map(new MapFunction<User, User>() {
-			@Override
-			public User map(User value) throws Exception {
-				value.setTypeMap(null);
-				return value;
-			}
-		});
-
-		usersDS.writeAsText(resultPath);
-
-		env.execute("Simple Avro read job");
-
-		expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" +
-					"{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n";
-	}
-
-	@Test
-	public void testSerializeWithAvro() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().enableForceAvro();
-		Path in = new Path(inFile.getAbsoluteFile().toURI());
-
-		AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
-		DataSet<User> usersDS = env.createInput(users)
-				// null map type because the order changes in different JVMs (hard to test)
-				.map(new MapFunction<User, User>() {
-					@Override
-					public User map(User value) throws Exception {
-						Map<CharSequence, Long> ab = new HashMap<CharSequence, Long>(1);
-						ab.put("hehe", 12L);
-						value.setTypeMap(ab);
-						return value;
-					}
-				});
-
-		usersDS.writeAsText(resultPath);
-
-		env.execute("Simple Avro read job");
-
-		expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" +
-					"{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n";
-
-	}
-
-	@Test
-	public void testKeySelection() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().enableObjectReuse();
-		Path in = new Path(inFile.getAbsoluteFile().toURI());
-
-		AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
-		DataSet<User> usersDS = env.createInput(users);
-
-		DataSet<Tuple2<String, Integer>> res = usersDS.groupBy("name").reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() {
-			@Override
-			public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception {
-				for (User u : values) {
-					out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1));
-				}
-			}
-		});
-		res.writeAsText(resultPath);
-		env.execute("Avro Key selection");
-
-		expected = "(Alyssa,1)\n(Charlie,1)\n";
-	}
-
-	@Test
-	public void testWithAvroGenericSer() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().enableForceAvro();
-		Path in = new Path(inFile.getAbsoluteFile().toURI());
-
-		AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
-		DataSet<User> usersDS = env.createInput(users);
-
-		DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new KeySelector<User, String>() {
-			@Override
-			public String getKey(User value) throws Exception {
-				return String.valueOf(value.getName());
-			}
-		}).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() {
-			@Override
-			public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception {
-				for (User u : values) {
-					out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1));
-				}
-			}
-		});
-
-		res.writeAsText(resultPath);
-		env.execute("Avro Key selection");
-
-		expected = "(Charlie,1)\n(Alyssa,1)\n";
-	}
-
-	@Test
-	public void testWithKryoGenericSer() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().enableForceKryo();
-		Path in = new Path(inFile.getAbsoluteFile().toURI());
-
-		AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
-		DataSet<User> usersDS = env.createInput(users);
-
-		DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new KeySelector<User, String>() {
-			@Override
-			public String getKey(User value) throws Exception {
-				return String.valueOf(value.getName());
-			}
-		}).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() {
-			@Override
-			public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception {
-				for (User u : values) {
-					out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1));
-				}
-			}
-		});
-
-		res.writeAsText(resultPath);
-		env.execute("Avro Key selection");
-
-		expected = "(Charlie,1)\n(Alyssa,1)\n";
-	}
-
-	/**
-	 * Test some know fields for grouping on.
-	 */
-	@Test
-	public void testAllFields() throws Exception {
-		for (String fieldName : Arrays.asList("name", "type_enum", "type_double_test")) {
-			testField(fieldName);
-		}
-	}
-
-	private void testField(final String fieldName) throws Exception {
-		before();
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Path in = new Path(inFile.getAbsoluteFile().toURI());
-
-		AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
-		DataSet<User> usersDS = env.createInput(users);
-
-		DataSet<Object> res = usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction<User, Object>() {
-			@Override
-			public void reduce(Iterable<User> values, Collector<Object> out) throws Exception {
-				for (User u : values) {
-					out.collect(u.get(fieldName));
-				}
-			}
-		});
-		res.writeAsText(resultPath);
-		env.execute("Simple Avro read job");
-
-		// test if automatic registration of the Types worked
-		ExecutionConfig ec = env.getConfig();
-		Assert.assertTrue(ec.getRegisteredKryoTypes().contains(org.apache.flink.api.io.avro.generated.Fixed16.class));
-
-		if (fieldName.equals("name")) {
-			expected = "Alyssa\nCharlie";
-		} else if (fieldName.equals("type_enum")) {
-			expected = "GREEN\nRED\n";
-		} else if (fieldName.equals("type_double_test")) {
-			expected = "123.45\n1.337\n";
-		} else {
-			Assert.fail("Unknown field");
-		}
-
-		after();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
deleted file mode 100644
index 7bff28a..0000000
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
+++ /dev/null
@@ -1,460 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.io.avro;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.io.avro.generated.Address;
-import org.apache.flink.api.io.avro.generated.Colors;
-import org.apache.flink.api.io.avro.generated.User;
-import org.apache.flink.api.java.io.AvroInputFormat;
-import org.apache.flink.api.java.typeutils.AvroTypeInfo;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.avro.util.Utf8;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test the avro input format.
- * (The testcase is mostly the getting started tutorial of avro)
- * http://avro.apache.org/docs/current/gettingstartedjava.html
- */
-public class AvroRecordInputFormatTest {
-
-	public File testFile;
-
-	static final String TEST_NAME = "Alyssa";
-
-	static final String TEST_ARRAY_STRING_1 = "ELEMENT 1";
-	static final String TEST_ARRAY_STRING_2 = "ELEMENT 2";
-
-	static final boolean TEST_ARRAY_BOOLEAN_1 = true;
-	static final boolean TEST_ARRAY_BOOLEAN_2 = false;
-
-	static final Colors TEST_ENUM_COLOR = Colors.GREEN;
-
-	static final String TEST_MAP_KEY1 = "KEY 1";
-	static final long TEST_MAP_VALUE1 = 8546456L;
-	static final String TEST_MAP_KEY2 = "KEY 2";
-	static final long TEST_MAP_VALUE2 = 17554L;
-
-	static final int TEST_NUM = 239;
-	static final String TEST_STREET = "Baker Street";
-	static final String TEST_CITY = "London";
-	static final String TEST_STATE = "London";
-	static final String TEST_ZIP = "NW1 6XE";
-
-	private Schema userSchema = new User().getSchema();
-
-	public static void writeTestFile(File testFile) throws IOException {
-		ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>();
-		stringArray.add(TEST_ARRAY_STRING_1);
-		stringArray.add(TEST_ARRAY_STRING_2);
-
-		ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
-		booleanArray.add(TEST_ARRAY_BOOLEAN_1);
-		booleanArray.add(TEST_ARRAY_BOOLEAN_2);
-
-		HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>();
-		longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
-		longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
-
-		Address addr = new Address();
-		addr.setNum(TEST_NUM);
-		addr.setStreet(TEST_STREET);
-		addr.setCity(TEST_CITY);
-		addr.setState(TEST_STATE);
-		addr.setZip(TEST_ZIP);
-
-		User user1 = new User();
-
-		user1.setName(TEST_NAME);
-		user1.setFavoriteNumber(256);
-		user1.setTypeDoubleTest(123.45d);
-		user1.setTypeBoolTest(true);
-		user1.setTypeArrayString(stringArray);
-		user1.setTypeArrayBoolean(booleanArray);
-		user1.setTypeEnum(TEST_ENUM_COLOR);
-		user1.setTypeMap(longMap);
-		user1.setTypeNested(addr);
-
-		// Construct via builder
-		User user2 = User.newBuilder()
-				.setName("Charlie")
-				.setFavoriteColor("blue")
-				.setFavoriteNumber(null)
-				.setTypeBoolTest(false)
-				.setTypeDoubleTest(1.337d)
-				.setTypeNullTest(null)
-				.setTypeLongTest(1337L)
-				.setTypeArrayString(new ArrayList<CharSequence>())
-				.setTypeArrayBoolean(new ArrayList<Boolean>())
-				.setTypeNullableArray(null)
-				.setTypeEnum(Colors.RED)
-				.setTypeMap(new HashMap<CharSequence, Long>())
-				.setTypeFixed(null)
-				.setTypeUnion(null)
-				.setTypeNested(
-						Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET)
-								.setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP)
-								.build())
-				.build();
-		DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
-		DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
-		dataFileWriter.create(user1.getSchema(), testFile);
-		dataFileWriter.append(user1);
-		dataFileWriter.append(user2);
-		dataFileWriter.close();
-	}
-
-	@Before
-	public void createFiles() throws IOException {
-		testFile = File.createTempFile("AvroInputFormatTest", null);
-		writeTestFile(testFile);
-	}
-
-	/**
-	 * Test if the AvroInputFormat is able to properly read data from an avro file.
-	 * @throws IOException
-	 */
-	@Test
-	public void testDeserialisation() throws IOException {
-		Configuration parameters = new Configuration();
-
-		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
-
-		format.configure(parameters);
-		FileInputSplit[] splits = format.createInputSplits(1);
-		assertEquals(splits.length, 1);
-		format.open(splits[0]);
-
-		User u = format.nextRecord(null);
-		assertNotNull(u);
-
-		String name = u.getName().toString();
-		assertNotNull("empty record", name);
-		assertEquals("name not equal", TEST_NAME, name);
-
-		// check arrays
-		List<CharSequence> sl = u.getTypeArrayString();
-		assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString());
-		assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString());
-
-		List<Boolean> bl = u.getTypeArrayBoolean();
-		assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0));
-		assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1));
-
-		// check enums
-		Colors enumValue = u.getTypeEnum();
-		assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue);
-
-		// check maps
-		Map<CharSequence, Long> lm = u.getTypeMap();
-		assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
-		assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
-
-		assertFalse("expecting second element", format.reachedEnd());
-		assertNotNull("expecting second element", format.nextRecord(u));
-
-		assertNull(format.nextRecord(u));
-		assertTrue(format.reachedEnd());
-
-		format.close();
-	}
-
-	/**
-	 * Test if the AvroInputFormat is able to properly read data from an avro file.
-	 * @throws IOException
-	 */
-	@Test
-	public void testDeserialisationReuseAvroRecordFalse() throws IOException {
-		Configuration parameters = new Configuration();
-
-		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
-		format.setReuseAvroValue(false);
-
-		format.configure(parameters);
-		FileInputSplit[] splits = format.createInputSplits(1);
-		assertEquals(splits.length, 1);
-		format.open(splits[0]);
-
-		User u = format.nextRecord(null);
-		assertNotNull(u);
-
-		String name = u.getName().toString();
-		assertNotNull("empty record", name);
-		assertEquals("name not equal", TEST_NAME, name);
-
-		// check arrays
-		List<CharSequence> sl = u.getTypeArrayString();
-		assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString());
-		assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString());
-
-		List<Boolean> bl = u.getTypeArrayBoolean();
-		assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0));
-		assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1));
-
-		// check enums
-		Colors enumValue = u.getTypeEnum();
-		assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue);
-
-		// check maps
-		Map<CharSequence, Long> lm = u.getTypeMap();
-		assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
-		assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
-
-		assertFalse("expecting second element", format.reachedEnd());
-		assertNotNull("expecting second element", format.nextRecord(u));
-
-		assertNull(format.nextRecord(u));
-		assertTrue(format.reachedEnd());
-
-		format.close();
-	}
-
-	/**
-	 * Test if the Flink serialization is able to properly process GenericData.Record types.
-	 * Usually users of Avro generate classes (POJOs) from Avro schemas.
-	 * However, if generated classes are not available, one can also use GenericData.Record.
-	 * It is an untyped key-value record which is using a schema to validate the correctness of the data.
-	 *
-	 * <p>It is not recommended to use GenericData.Record with Flink. Use generated POJOs instead.
-	 */
-	@Test
-	public void testDeserializeToGenericType() throws IOException {
-		DatumReader<GenericData.Record> datumReader = new GenericDatumReader<>(userSchema);
-
-		try (FileReader<GenericData.Record> dataFileReader = DataFileReader.openReader(testFile, datumReader)) {
-			// initialize Record by reading it from disk (that's easier than creating it by hand)
-			GenericData.Record rec = new GenericData.Record(userSchema);
-			dataFileReader.next(rec);
-
-			// check if record has been read correctly
-			assertNotNull(rec);
-			assertEquals("name not equal", TEST_NAME, rec.get("name").toString());
-			assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());
-			assertEquals(null, rec.get("type_long_test")); // it is null for the first record.
-
-			// now serialize it with our framework:
-			TypeInformation<GenericData.Record> te = TypeExtractor.createTypeInfo(GenericData.Record.class);
-
-			ExecutionConfig ec = new ExecutionConfig();
-			Assert.assertEquals(GenericTypeInfo.class, te.getClass());
-
-			Serializers.recursivelyRegisterType(te.getTypeClass(), ec, new HashSet<Class<?>>());
-
-			TypeSerializer<GenericData.Record> tser = te.createSerializer(ec);
-			Assert.assertEquals(1, ec.getDefaultKryoSerializerClasses().size());
-			Assert.assertTrue(
-					ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) &&
-							ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(Serializers.AvroSchemaSerializer.class));
-
-			ByteArrayOutputStream out = new ByteArrayOutputStream();
-			try (DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(out)) {
-				tser.serialize(rec, outView);
-			}
-
-			GenericData.Record newRec;
-			try (DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(
-					new ByteArrayInputStream(out.toByteArray()))) {
-				newRec = tser.deserialize(inView);
-			}
-
-			// check if it is still the same
-			assertNotNull(newRec);
-			assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString());
-			assertEquals("name not equal", TEST_NAME, newRec.get("name").toString());
-			assertEquals(null, newRec.get("type_long_test"));
-		}
-	}
-
-	/**
-	 * This test validates proper serialization with specific (generated POJO) types.
-	 */
-	@Test
-	public void testDeserializeToSpecificType() throws IOException {
-
-		DatumReader<User> datumReader = new SpecificDatumReader<User>(userSchema);
-
-		try (FileReader<User> dataFileReader = DataFileReader.openReader(testFile, datumReader)) {
-			User rec = dataFileReader.next();
-
-			// check if record has been read correctly
-			assertNotNull(rec);
-			assertEquals("name not equal", TEST_NAME, rec.get("name").toString());
-			assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());
-
-			// now serialize it with our framework:
-			ExecutionConfig ec = new ExecutionConfig();
-			TypeInformation<User> te = TypeExtractor.createTypeInfo(User.class);
-
-			Assert.assertEquals(AvroTypeInfo.class, te.getClass());
-			TypeSerializer<User> tser = te.createSerializer(ec);
-
-			ByteArrayOutputStream out = new ByteArrayOutputStream();
-			try (DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(out)) {
-				tser.serialize(rec, outView);
-			}
-
-			User newRec;
-			try (DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(
-					new ByteArrayInputStream(out.toByteArray()))) {
-				newRec = tser.deserialize(inView);
-			}
-
-			// check if it is still the same
-			assertNotNull(newRec);
-			assertEquals("name not equal", TEST_NAME, newRec.getName().toString());
-			assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.getTypeEnum().toString());
-		}
-	}
-
-	/**
-	 * Test if the AvroInputFormat is able to properly read data from an Avro
-	 * file as a GenericRecord.
-	 *
-	 * @throws IOException
-	 */
-	@Test
-	public void testDeserialisationGenericRecord() throws IOException {
-		Configuration parameters = new Configuration();
-
-		AvroInputFormat<GenericRecord> format = new AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()),
-				GenericRecord.class);
-
-		doTestDeserializationGenericRecord(format, parameters);
-	}
-
-	/**
-	 * Helper method to test GenericRecord serialisation.
-	 *
-	 * @param format
-	 *            the format to test
-	 * @param parameters
-	 *            the configuration to use
-	 * @throws IOException
-	 *             thrown id there is a issue
-	 */
-	@SuppressWarnings("unchecked")
-	private void doTestDeserializationGenericRecord(final AvroInputFormat<GenericRecord> format,
-			final Configuration parameters) throws IOException {
-		try {
-			format.configure(parameters);
-			FileInputSplit[] splits = format.createInputSplits(1);
-			assertEquals(splits.length, 1);
-			format.open(splits[0]);
-
-			GenericRecord u = format.nextRecord(null);
-			assertNotNull(u);
-			assertEquals("The schemas should be equal", userSchema, u.getSchema());
-
-			String name = u.get("name").toString();
-			assertNotNull("empty record", name);
-			assertEquals("name not equal", TEST_NAME, name);
-
-			// check arrays
-			List<CharSequence> sl = (List<CharSequence>) u.get("type_array_string");
-			assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString());
-			assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString());
-
-			List<Boolean> bl = (List<Boolean>) u.get("type_array_boolean");
-			assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0));
-			assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1));
-
-			// check enums
-			GenericData.EnumSymbol enumValue = (GenericData.EnumSymbol) u.get("type_enum");
-			assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), enumValue.toString());
-
-			// check maps
-			Map<CharSequence, Long> lm = (Map<CharSequence, Long>) u.get("type_map");
-			assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
-			assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
-
-			assertFalse("expecting second element", format.reachedEnd());
-			assertNotNull("expecting second element", format.nextRecord(u));
-
-			assertNull(format.nextRecord(u));
-			assertTrue(format.reachedEnd());
-		} finally {
-			format.close();
-		}
-	}
-
-	/**
-	 * Test if the AvroInputFormat is able to properly read data from an avro
-	 * file as a GenericRecord.
-	 *
-	 * @throws IOException if there is an error
-	 */
-	@Test
-	public void testDeserialisationGenericRecordReuseAvroValueFalse() throws IOException {
-		Configuration parameters = new Configuration();
-
-		AvroInputFormat<GenericRecord> format = new AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()),
-				GenericRecord.class);
-		format.configure(parameters);
-		format.setReuseAvroValue(false);
-
-		doTestDeserializationGenericRecord(format, parameters);
-	}
-
-	@After
-	public void deleteFiles() {
-		testFile.delete();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
deleted file mode 100644
index 6401a87..0000000
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.io.avro;
-
-import org.apache.flink.api.io.avro.generated.Address;
-import org.apache.flink.api.io.avro.generated.Colors;
-import org.apache.flink.api.io.avro.generated.Fixed16;
-import org.apache.flink.api.io.avro.generated.User;
-import org.apache.flink.api.java.io.AvroInputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Random;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Test the avro input format.
- * (The testcase is mostly the getting started tutorial of avro)
- * http://avro.apache.org/docs/current/gettingstartedjava.html
- */
-public class AvroSplittableInputFormatTest {
-
-	private File testFile;
-
-	static final String TEST_NAME = "Alyssa";
-
-	static final String TEST_ARRAY_STRING_1 = "ELEMENT 1";
-	static final String TEST_ARRAY_STRING_2 = "ELEMENT 2";
-
-	static final boolean TEST_ARRAY_BOOLEAN_1 = true;
-	static final boolean TEST_ARRAY_BOOLEAN_2 = false;
-
-	static final Colors TEST_ENUM_COLOR = Colors.GREEN;
-
-	static final String TEST_MAP_KEY1 = "KEY 1";
-	static final long TEST_MAP_VALUE1 = 8546456L;
-	static final String TEST_MAP_KEY2 = "KEY 2";
-	static final long TEST_MAP_VALUE2 = 17554L;
-
-	static final Integer TEST_NUM = new Integer(239);
-	static final String TEST_STREET = "Baker Street";
-	static final String TEST_CITY = "London";
-	static final String TEST_STATE = "London";
-	static final String TEST_ZIP = "NW1 6XE";
-
-	static final int NUM_RECORDS = 5000;
-
-	@Before
-	public void createFiles() throws IOException {
-		testFile = File.createTempFile("AvroSplittableInputFormatTest", null);
-
-		ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>();
-		stringArray.add(TEST_ARRAY_STRING_1);
-		stringArray.add(TEST_ARRAY_STRING_2);
-
-		ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
-		booleanArray.add(TEST_ARRAY_BOOLEAN_1);
-		booleanArray.add(TEST_ARRAY_BOOLEAN_2);
-
-		HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>();
-		longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
-		longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
-
-		Address addr = new Address();
-		addr.setNum(new Integer(TEST_NUM));
-		addr.setStreet(TEST_STREET);
-		addr.setCity(TEST_CITY);
-		addr.setState(TEST_STATE);
-		addr.setZip(TEST_ZIP);
-
-		User user1 = new User();
-		user1.setName(TEST_NAME);
-		user1.setFavoriteNumber(256);
-		user1.setTypeDoubleTest(123.45d);
-		user1.setTypeBoolTest(true);
-		user1.setTypeArrayString(stringArray);
-		user1.setTypeArrayBoolean(booleanArray);
-		user1.setTypeEnum(TEST_ENUM_COLOR);
-		user1.setTypeMap(longMap);
-		user1.setTypeNested(addr);
-
-		// Construct via builder
-		User user2 = User.newBuilder()
-				.setName(TEST_NAME)
-				.setFavoriteColor("blue")
-				.setFavoriteNumber(null)
-				.setTypeBoolTest(false)
-				.setTypeDoubleTest(1.337d)
-				.setTypeNullTest(null)
-				.setTypeLongTest(1337L)
-				.setTypeArrayString(new ArrayList<CharSequence>())
-				.setTypeArrayBoolean(new ArrayList<Boolean>())
-				.setTypeNullableArray(null)
-				.setTypeEnum(Colors.RED)
-				.setTypeMap(new HashMap<CharSequence, Long>())
-				.setTypeFixed(new Fixed16())
-				.setTypeUnion(123L)
-				.setTypeNested(
-						Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET)
-								.setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP)
-								.build())
-				.build();
-		DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
-		DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
-		dataFileWriter.create(user1.getSchema(), testFile);
-		dataFileWriter.append(user1);
-		dataFileWriter.append(user2);
-
-		Random rnd = new Random(1337);
-		for (int i = 0; i < NUM_RECORDS - 2; i++) {
-			User user = new User();
-			user.setName(TEST_NAME + rnd.nextInt());
-			user.setFavoriteNumber(rnd.nextInt());
-			user.setTypeDoubleTest(rnd.nextDouble());
-			user.setTypeBoolTest(true);
-			user.setTypeArrayString(stringArray);
-			user.setTypeArrayBoolean(booleanArray);
-			user.setTypeEnum(TEST_ENUM_COLOR);
-			user.setTypeMap(longMap);
-			Address address = new Address();
-			address.setNum(new Integer(TEST_NUM));
-			address.setStreet(TEST_STREET);
-			address.setCity(TEST_CITY);
-			address.setState(TEST_STATE);
-			address.setZip(TEST_ZIP);
-			user.setTypeNested(address);
-
-			dataFileWriter.append(user);
-		}
-		dataFileWriter.close();
-	}
-
-	@Test
-	public void testSplittedIF() throws IOException {
-		Configuration parameters = new Configuration();
-
-		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
-
-		format.configure(parameters);
-		FileInputSplit[] splits = format.createInputSplits(4);
-		assertEquals(splits.length, 4);
-		int elements = 0;
-		int[] elementsPerSplit = new int[4];
-		for (int i = 0; i < splits.length; i++) {
-			format.open(splits[i]);
-			while (!format.reachedEnd()) {
-				User u = format.nextRecord(null);
-				Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
-				elements++;
-				elementsPerSplit[i]++;
-			}
-			format.close();
-		}
-
-		Assert.assertEquals(1539, elementsPerSplit[0]);
-		Assert.assertEquals(1026, elementsPerSplit[1]);
-		Assert.assertEquals(1539, elementsPerSplit[2]);
-		Assert.assertEquals(896, elementsPerSplit[3]);
-		Assert.assertEquals(NUM_RECORDS, elements);
-		format.close();
-	}
-
-	@Test
-	public void testAvroRecoveryWithFailureAtStart() throws Exception {
-		final int recordsUntilCheckpoint = 132;
-
-		Configuration parameters = new Configuration();
-
-		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
-		format.configure(parameters);
-
-		FileInputSplit[] splits = format.createInputSplits(4);
-		assertEquals(splits.length, 4);
-
-		int elements = 0;
-		int[] elementsPerSplit = new int[4];
-		for (int i = 0; i < splits.length; i++) {
-			format.reopen(splits[i], format.getCurrentState());
-			while (!format.reachedEnd()) {
-				User u = format.nextRecord(null);
-				Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
-				elements++;
-
-				if (format.getRecordsReadFromBlock() == recordsUntilCheckpoint) {
-
-					// do the whole checkpoint-restore procedure and see if we pick up from where we left off.
-					Tuple2<Long, Long> state = format.getCurrentState();
-
-					// this is to make sure that nothing stays from the previous format
-					// (as it is going to be in the normal case)
-					format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class);
-
-					format.reopen(splits[i], state);
-					assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint);
-				}
-				elementsPerSplit[i]++;
-			}
-			format.close();
-		}
-
-		Assert.assertEquals(1539, elementsPerSplit[0]);
-		Assert.assertEquals(1026, elementsPerSplit[1]);
-		Assert.assertEquals(1539, elementsPerSplit[2]);
-		Assert.assertEquals(896, elementsPerSplit[3]);
-		Assert.assertEquals(NUM_RECORDS, elements);
-		format.close();
-	}
-
-	@Test
-	public void testAvroRecovery() throws Exception {
-		final int recordsUntilCheckpoint = 132;
-
-		Configuration parameters = new Configuration();
-
-		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
-		format.configure(parameters);
-
-		FileInputSplit[] splits = format.createInputSplits(4);
-		assertEquals(splits.length, 4);
-
-		int elements = 0;
-		int[] elementsPerSplit = new int[4];
-		for (int i = 0; i < splits.length; i++) {
-			format.open(splits[i]);
-			while (!format.reachedEnd()) {
-				User u = format.nextRecord(null);
-				Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
-				elements++;
-
-				if (format.getRecordsReadFromBlock() == recordsUntilCheckpoint) {
-
-					// do the whole checkpoint-restore procedure and see if we pick up from where we left off.
-					Tuple2<Long, Long> state = format.getCurrentState();
-
-					// this is to make sure that nothing stays from the previous format
-					// (as it is going to be in the normal case)
-					format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class);
-
-					format.reopen(splits[i], state);
-					assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint);
-				}
-				elementsPerSplit[i]++;
-			}
-			format.close();
-		}
-
-		Assert.assertEquals(1539, elementsPerSplit[0]);
-		Assert.assertEquals(1026, elementsPerSplit[1]);
-		Assert.assertEquals(1539, elementsPerSplit[2]);
-		Assert.assertEquals(896, elementsPerSplit[3]);
-		Assert.assertEquals(NUM_RECORDS, elements);
-		format.close();
-	}
-
-	/*
-	This test is gave the reference values for the test of Flink's IF.
-
-	This dependency needs to be added
-
-        <dependency>
-            <groupId>org.apache.avro</groupId>
-            <artifactId>avro-mapred</artifactId>
-            <version>1.7.6</version>
-        </dependency>
-
-	@Test
-	public void testHadoop() throws Exception {
-		JobConf jf = new JobConf();
-		FileInputFormat.addInputPath(jf, new org.apache.hadoop.fs.Path(testFile.toURI()));
-		jf.setBoolean(org.apache.avro.mapred.AvroInputFormat.IGNORE_FILES_WITHOUT_EXTENSION_KEY, false);
-		org.apache.avro.mapred.AvroInputFormat<User> format = new org.apache.avro.mapred.AvroInputFormat<User>();
-		InputSplit[] sp = format.getSplits(jf, 4);
-		int elementsPerSplit[] = new int[4];
-		int cnt = 0;
-		int i = 0;
-		for (InputSplit s:sp) {
-			RecordReader<AvroWrapper<User>, NullWritable> r = format.getRecordReader(s, jf, new HadoopDummyReporter());
-			AvroWrapper<User> k = r.createKey();
-			NullWritable v = r.createValue();
-
-			while (r.next(k, v)) {
-				cnt++;
-				elementsPerSplit[i]++;
-			}
-			i++;
-		}
-		System.out.println("Status "+Arrays.toString(elementsPerSplit));
-	} **/
-
-	@After
-	public void deleteFiles() {
-		testFile.delete();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
deleted file mode 100644
index 96ffb7f..0000000
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/AvroTypeExample.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.io.avro.example;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.io.GenericInputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-import java.io.IOException;
-import java.util.Random;
-
-/**
- * Example that shows how to use an Avro typea in a program.
- */
-@SuppressWarnings("serial")
-public class AvroTypeExample {
-
-	public static void main(String[] args) throws Exception {
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<User> users = env.createInput(new UserGeneratingInputFormat());
-
-		users
-			.map(new NumberExtractingMapper())
-			.groupBy(1)
-			.reduceGroup(new ConcatenatingReducer())
-			.print();
-	}
-
-	private static final class NumberExtractingMapper implements MapFunction<User, Tuple2<User, Integer>> {
-
-		@Override
-		public Tuple2<User, Integer> map(User user) {
-			return new Tuple2<User, Integer>(user, user.getFavoriteNumber());
-		}
-	}
-
-	private static final class ConcatenatingReducer implements GroupReduceFunction<Tuple2<User, Integer>, Tuple2<Integer, String>> {
-
-		@Override
-		public void reduce(Iterable<Tuple2<User, Integer>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
-			int number = 0;
-			StringBuilder colors = new StringBuilder();
-
-			for (Tuple2<User, Integer> u : values) {
-				number = u.f1;
-				colors.append(u.f0.getFavoriteColor()).append(" - ");
-			}
-
-			colors.setLength(colors.length() - 3);
-			out.collect(new Tuple2<Integer, String>(number, colors.toString()));
-		}
-	}
-
-	private static final class UserGeneratingInputFormat extends GenericInputFormat<User> {
-
-		private static final long serialVersionUID = 1L;
-
-		private static final int NUM = 100;
-
-		private final Random rnd = new Random(32498562304986L);
-
-		private static final String[] NAMES = { "Peter", "Bob", "Liddy", "Alexander", "Stan" };
-
-		private static final String[] COLORS = { "mauve", "crimson", "copper", "sky", "grass" };
-
-		private int count;
-
-		@Override
-		public boolean reachedEnd() throws IOException {
-			return count >= NUM;
-		}
-
-		@Override
-		public User nextRecord(User reuse) throws IOException {
-			count++;
-
-			User u = new User();
-			u.setName(NAMES[rnd.nextInt(NAMES.length)]);
-			u.setFavoriteColor(COLORS[rnd.nextInt(COLORS.length)]);
-			u.setFavoriteNumber(rnd.nextInt(87));
-			return u;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/User.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/User.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/User.java
deleted file mode 100644
index 4608f96..0000000
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/example/User.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * Autogenerated by Avro
- * 
- * DO NOT EDIT DIRECTLY
- */
-package org.apache.flink.api.io.avro.example;  
-@SuppressWarnings("all")
-@org.apache.avro.specific.AvroGenerated
-public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
-  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.avro.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
-  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
-  @Deprecated public java.lang.CharSequence name;
-  @Deprecated public java.lang.Integer favorite_number;
-  @Deprecated public java.lang.CharSequence favorite_color;
-
-  /**
-   * Default constructor.  Note that this does not initialize fields
-   * to their default values from the schema.  If that is desired then
-   * one should use {@link #newBuilder()}. 
-   */
-  public User() {}
-
-  /**
-   * All-args constructor.
-   */
-  public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color) {
-    this.name = name;
-    this.favorite_number = favorite_number;
-    this.favorite_color = favorite_color;
-  }
-
-  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
-  // Used by DatumWriter.  Applications should not call. 
-  public java.lang.Object get(int field$) {
-    switch (field$) {
-    case 0: return name;
-    case 1: return favorite_number;
-    case 2: return favorite_color;
-    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
-    }
-  }
-  // Used by DatumReader.  Applications should not call. 
-  @SuppressWarnings(value="unchecked")
-  public void put(int field$, java.lang.Object value$) {
-    switch (field$) {
-    case 0: name = (java.lang.CharSequence)value$; break;
-    case 1: favorite_number = (java.lang.Integer)value$; break;
-    case 2: favorite_color = (java.lang.CharSequence)value$; break;
-    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
-    }
-  }
-
-  /**
-   * Gets the value of the 'name' field.
-   */
-  public java.lang.CharSequence getName() {
-    return name;
-  }
-
-  /**
-   * Sets the value of the 'name' field.
-   * @param value the value to set.
-   */
-  public void setName(java.lang.CharSequence value) {
-    this.name = value;
-  }
-
-  /**
-   * Gets the value of the 'favorite_number' field.
-   */
-  public java.lang.Integer getFavoriteNumber() {
-    return favorite_number;
-  }
-
-  /**
-   * Sets the value of the 'favorite_number' field.
-   * @param value the value to set.
-   */
-  public void setFavoriteNumber(java.lang.Integer value) {
-    this.favorite_number = value;
-  }
-
-  /**
-   * Gets the value of the 'favorite_color' field.
-   */
-  public java.lang.CharSequence getFavoriteColor() {
-    return favorite_color;
-  }
-
-  /**
-   * Sets the value of the 'favorite_color' field.
-   * @param value the value to set.
-   */
-  public void setFavoriteColor(java.lang.CharSequence value) {
-    this.favorite_color = value;
-  }
-
-  /** Creates a new User RecordBuilder */
-  public static org.apache.flink.api.io.avro.example.User.Builder newBuilder() {
-    return new org.apache.flink.api.io.avro.example.User.Builder();
-  }
-  
-  /** Creates a new User RecordBuilder by copying an existing Builder */
-  public static org.apache.flink.api.io.avro.example.User.Builder newBuilder(org.apache.flink.api.io.avro.example.User.Builder other) {
-    return new org.apache.flink.api.io.avro.example.User.Builder(other);
-  }
-  
-  /** Creates a new User RecordBuilder by copying an existing User instance */
-  public static org.apache.flink.api.io.avro.example.User.Builder newBuilder(org.apache.flink.api.io.avro.example.User other) {
-    return new org.apache.flink.api.io.avro.example.User.Builder(other);
-  }
-  
-  /**
-   * RecordBuilder for User instances.
-   */
-  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User>
-    implements org.apache.avro.data.RecordBuilder<User> {
-
-    private java.lang.CharSequence name;
-    private java.lang.Integer favorite_number;
-    private java.lang.CharSequence favorite_color;
-
-    /** Creates a new Builder */
-    private Builder() {
-      super(org.apache.flink.api.io.avro.example.User.SCHEMA$);
-    }
-    
-    /** Creates a Builder by copying an existing Builder */
-    private Builder(org.apache.flink.api.io.avro.example.User.Builder other) {
-      super(other);
-      if (isValidValue(fields()[0], other.name)) {
-        this.name = data().deepCopy(fields()[0].schema(), other.name);
-        fieldSetFlags()[0] = true;
-      }
-      if (isValidValue(fields()[1], other.favorite_number)) {
-        this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
-        fieldSetFlags()[1] = true;
-      }
-      if (isValidValue(fields()[2], other.favorite_color)) {
-        this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
-        fieldSetFlags()[2] = true;
-      }
-    }
-    
-    /** Creates a Builder by copying an existing User instance */
-    private Builder(org.apache.flink.api.io.avro.example.User other) {
-            super(org.apache.flink.api.io.avro.example.User.SCHEMA$);
-      if (isValidValue(fields()[0], other.name)) {
-        this.name = data().deepCopy(fields()[0].schema(), other.name);
-        fieldSetFlags()[0] = true;
-      }
-      if (isValidValue(fields()[1], other.favorite_number)) {
-        this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
-        fieldSetFlags()[1] = true;
-      }
-      if (isValidValue(fields()[2], other.favorite_color)) {
-        this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
-        fieldSetFlags()[2] = true;
-      }
-    }
-
-    /** Gets the value of the 'name' field */
-    public java.lang.CharSequence getName() {
-      return name;
-    }
-    
-    /** Sets the value of the 'name' field */
-    public org.apache.flink.api.io.avro.example.User.Builder setName(java.lang.CharSequence value) {
-      validate(fields()[0], value);
-      this.name = value;
-      fieldSetFlags()[0] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'name' field has been set */
-    public boolean hasName() {
-      return fieldSetFlags()[0];
-    }
-    
-    /** Clears the value of the 'name' field */
-    public org.apache.flink.api.io.avro.example.User.Builder clearName() {
-      name = null;
-      fieldSetFlags()[0] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'favorite_number' field */
-    public java.lang.Integer getFavoriteNumber() {
-      return favorite_number;
-    }
-    
-    /** Sets the value of the 'favorite_number' field */
-    public org.apache.flink.api.io.avro.example.User.Builder setFavoriteNumber(java.lang.Integer value) {
-      validate(fields()[1], value);
-      this.favorite_number = value;
-      fieldSetFlags()[1] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'favorite_number' field has been set */
-    public boolean hasFavoriteNumber() {
-      return fieldSetFlags()[1];
-    }
-    
-    /** Clears the value of the 'favorite_number' field */
-    public org.apache.flink.api.io.avro.example.User.Builder clearFavoriteNumber() {
-      favorite_number = null;
-      fieldSetFlags()[1] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'favorite_color' field */
-    public java.lang.CharSequence getFavoriteColor() {
-      return favorite_color;
-    }
-    
-    /** Sets the value of the 'favorite_color' field */
-    public org.apache.flink.api.io.avro.example.User.Builder setFavoriteColor(java.lang.CharSequence value) {
-      validate(fields()[2], value);
-      this.favorite_color = value;
-      fieldSetFlags()[2] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'favorite_color' field has been set */
-    public boolean hasFavoriteColor() {
-      return fieldSetFlags()[2];
-    }
-    
-    /** Clears the value of the 'favorite_color' field */
-    public org.apache.flink.api.io.avro.example.User.Builder clearFavoriteColor() {
-      favorite_color = null;
-      fieldSetFlags()[2] = false;
-      return this;
-    }
-
-    @Override
-    public User build() {
-      try {
-        User record = new User();
-        record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
-        record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]);
-        record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]);
-        return record;
-      } catch (Exception e) {
-        throw new org.apache.avro.AvroRuntimeException(e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
deleted file mode 100644
index 5ae88ca..0000000
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.fs.Path;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests for the type extraction of the {@link AvroInputFormat}.
- */
-public class AvroInputFormatTypeExtractionTest {
-
-	@Test
-	public void testTypeExtraction() {
-		try {
-			InputFormat<MyAvroType, ?> format = new AvroInputFormat<MyAvroType>(new Path("file:///ignore/this/file"), MyAvroType.class);
-
-			TypeInformation<?> typeInfoDirect = TypeExtractor.getInputFormatTypes(format);
-
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			DataSet<MyAvroType> input = env.createInput(format);
-			TypeInformation<?> typeInfoDataSet = input.getType();
-
-			Assert.assertTrue(typeInfoDirect instanceof PojoTypeInfo);
-			Assert.assertTrue(typeInfoDataSet instanceof PojoTypeInfo);
-
-			Assert.assertEquals(MyAvroType.class, typeInfoDirect.getTypeClass());
-			Assert.assertEquals(MyAvroType.class, typeInfoDataSet.getTypeClass());
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Test type.
-	 */
-	public static final class MyAvroType {
-
-		public String theString;
-
-		public MyAvroType recursive;
-
-		private double aDouble;
-
-		public double getaDouble() {
-			return aDouble;
-		}
-
-		public void setaDouble(double aDouble) {
-			this.aDouble = aDouble;
-		}
-
-		public void setTheString(String theString) {
-			this.theString = theString;
-		}
-
-		public String getTheString() {
-			return theString;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
deleted file mode 100644
index 71ebd78..0000000
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroOutputFormatTest.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-import org.apache.flink.api.io.avro.example.User;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.junit.Test;
-import org.mockito.internal.util.reflection.Whitebox;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import static org.apache.flink.api.java.io.AvroOutputFormat.Codec;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for {@link AvroOutputFormat}.
- */
-public class AvroOutputFormatTest {
-
-	@Test
-	public void testSetCodec() throws Exception {
-		// given
-		final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class);
-
-		// when
-		try {
-			outputFormat.setCodec(Codec.SNAPPY);
-		} catch (Exception ex) {
-			// then
-			fail("unexpected exception");
-		}
-	}
-
-	@Test
-	public void testSetCodecError() throws Exception {
-		// given
-		boolean error = false;
-		final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class);
-
-		// when
-		try {
-			outputFormat.setCodec(null);
-		} catch (Exception ex) {
-			error = true;
-		}
-
-		// then
-		assertTrue(error);
-	}
-
-	@Test
-	public void testSerialization() throws Exception {
-
-		serializeAndDeserialize(null, null);
-		serializeAndDeserialize(null, User.SCHEMA$);
-		for (final Codec codec : Codec.values()) {
-			serializeAndDeserialize(codec, null);
-			serializeAndDeserialize(codec, User.SCHEMA$);
-		}
-	}
-
-	private void serializeAndDeserialize(final Codec codec, final Schema schema) throws IOException, ClassNotFoundException {
-		// given
-		final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(User.class);
-		if (codec != null) {
-			outputFormat.setCodec(codec);
-		}
-		if (schema != null) {
-			outputFormat.setSchema(schema);
-		}
-
-		final ByteArrayOutputStream bos = new ByteArrayOutputStream();
-
-		// when
-		try (final ObjectOutputStream oos = new ObjectOutputStream(bos)) {
-			oos.writeObject(outputFormat);
-		}
-		try (final ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()))) {
-			// then
-			Object o = ois.readObject();
-			assertTrue(o instanceof AvroOutputFormat);
-			final AvroOutputFormat<User> restored = (AvroOutputFormat<User>) o;
-			final Codec restoredCodec = (Codec) Whitebox.getInternalState(restored, "codec");
-			final Schema restoredSchema = (Schema) Whitebox.getInternalState(restored, "userDefinedSchema");
-
-			assertTrue(codec != null ? restoredCodec == codec : restoredCodec == null);
-			assertTrue(schema != null ? restoredSchema.equals(schema) : restoredSchema == null);
-		}
-	}
-
-	@Test
-	public void testCompression() throws Exception {
-		// given
-		final Path outputPath = new Path(File.createTempFile("avro-output-file", "avro").getAbsolutePath());
-		final AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(outputPath, User.class);
-		outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
-
-		final Path compressedOutputPath = new Path(File.createTempFile("avro-output-file", "compressed.avro").getAbsolutePath());
-		final AvroOutputFormat<User> compressedOutputFormat = new AvroOutputFormat<>(compressedOutputPath, User.class);
-		compressedOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
-		compressedOutputFormat.setCodec(Codec.SNAPPY);
-
-		// when
-		output(outputFormat);
-		output(compressedOutputFormat);
-
-		// then
-		assertTrue(fileSize(outputPath) > fileSize(compressedOutputPath));
-
-		// cleanup
-		FileSystem fs = FileSystem.getLocalFileSystem();
-		fs.delete(outputPath, false);
-		fs.delete(compressedOutputPath, false);
-	}
-
-	private long fileSize(Path path) throws IOException {
-		return path.getFileSystem().getFileStatus(path).getLen();
-	}
-
-	private void output(final AvroOutputFormat<User> outputFormat) throws IOException {
-		outputFormat.configure(new Configuration());
-		outputFormat.open(1, 1);
-		for (int i = 0; i < 100; i++) {
-			outputFormat.writeRecord(new User("testUser", 1, "blue"));
-		}
-		outputFormat.close();
-	}
-
-	@Test
-	public void testGenericRecord() throws IOException {
-		final Path outputPath = new Path(File.createTempFile("avro-output-file", "generic.avro").getAbsolutePath());
-		final AvroOutputFormat<GenericRecord> outputFormat = new AvroOutputFormat<>(outputPath, GenericRecord.class);
-		Schema schema = new Schema.Parser().parse("{\"type\":\"record\", \"name\":\"user\", \"fields\": [{\"name\":\"user_name\", \"type\":\"string\"}, {\"name\":\"favorite_number\", \"type\":\"int\"}, {\"name\":\"favorite_color\", \"type\":\"string\"}]}");
-		outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);
-		outputFormat.setSchema(schema);
-		output(outputFormat, schema);
-
-		GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
-		DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(new File(outputPath.getPath()), reader);
-
-		while (dataFileReader.hasNext()) {
-			GenericRecord record = dataFileReader.next();
-			assertEquals(record.get("user_name").toString(), "testUser");
-			assertEquals(record.get("favorite_number"), 1);
-			assertEquals(record.get("favorite_color").toString(), "blue");
-		}
-
-		//cleanup
-		FileSystem fs = FileSystem.getLocalFileSystem();
-		fs.delete(outputPath, false);
-
-	}
-
-	private void output(final AvroOutputFormat<GenericRecord> outputFormat, Schema schema) throws IOException {
-		outputFormat.configure(new Configuration());
-		outputFormat.open(1, 1);
-		for (int i = 0; i < 100; i++) {
-			GenericRecord record = new GenericData.Record(schema);
-			record.put("user_name", "testUser");
-			record.put("favorite_number", 1);
-			record.put("favorite_color", "blue");
-			outputFormat.writeRecord(record);
-		}
-		outputFormat.close();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/typeutils/AvroTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/typeutils/AvroTypeInfoTest.java b/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/typeutils/AvroTypeInfoTest.java
deleted file mode 100644
index e0bb1a1..0000000
--- a/flink-connectors/flink-avro/src/test/java/org/apache/flink/api/java/typeutils/AvroTypeInfoTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
-import org.apache.flink.api.io.avro.generated.Address;
-import org.apache.flink.api.io.avro.generated.User;
-
-/**
- * Test for {@link AvroTypeInfo}.
- */
-public class AvroTypeInfoTest extends TypeInformationTestBase<AvroTypeInfo<?>> {
-
-	@Override
-	protected AvroTypeInfo<?>[] getTestData() {
-		return new AvroTypeInfo<?>[] {
-			new AvroTypeInfo<>(Address.class),
-			new AvroTypeInfo<>(User.class),
-		};
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/resources/avro/user.avsc
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/resources/avro/user.avsc b/flink-connectors/flink-avro/src/test/resources/avro/user.avsc
deleted file mode 100644
index ab8adf5..0000000
--- a/flink-connectors/flink-avro/src/test/resources/avro/user.avsc
+++ /dev/null
@@ -1,35 +0,0 @@
-[
-{"namespace": "org.apache.flink.api.io.avro.generated",
- "type": "record",
- "name": "Address",
- "fields": [
-     {"name": "num", "type": "int"},
-     {"name": "street", "type": "string"},
-     {"name": "city", "type": "string"},
-     {"name": "state", "type": "string"},
-     {"name": "zip", "type": "string"}
-  ]
-},
-{"namespace": "org.apache.flink.api.io.avro.generated",
- "type": "record",
- "name": "User",
- "fields": [
-     {"name": "name", "type": "string"},
-     {"name": "favorite_number",  "type": ["int", "null"]},
-     {"name": "favorite_color", "type": ["string", "null"]},
-     {"name": "type_long_test", "type": ["long", "null"]},
-     {"name": "type_double_test", "type": "double"},
-     {"name": "type_null_test", "type": ["null"]},
-     {"name": "type_bool_test", "type": ["boolean"]},
-     {"name": "type_array_string", "type" : {"type" : "array", "items" : "string"}},
-     {"name": "type_array_boolean", "type" : {"type" : "array", "items" : "boolean"}},
-     {"name": "type_nullable_array", "type": ["null", {"type":"array", "items":"string"}], "default":null},
-     {"name": "type_enum", "type": {"type": "enum", "name": "Colors", "symbols" : ["RED", "GREEN", "BLUE"]}},
-     {"name": "type_map", "type": {"type": "map", "values": "long"}},
-     {"name": "type_fixed",
-                 "size": 16,
-                 "type": ["null", {"name": "Fixed16", "size": 16, "type": "fixed"}] },
-     {"name": "type_union", "type": ["null", "boolean", "long", "double"]},
-     {"name": "type_nested", "type": ["null", "Address"]}
- ]
-}]

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/resources/log4j-test.properties b/flink-connectors/flink-avro/src/test/resources/log4j-test.properties
deleted file mode 100644
index 881dc06..0000000
--- a/flink-connectors/flink-avro/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=OFF, A1
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
-
-# A1 uses PatternLayout.
-log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-avro/src/test/resources/testdata.avro
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/src/test/resources/testdata.avro b/flink-connectors/flink-avro/src/test/resources/testdata.avro
deleted file mode 100644
index 45308b9..0000000
Binary files a/flink-connectors/flink-avro/src/test/resources/testdata.avro and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-filesystem/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/pom.xml b/flink-connectors/flink-connector-filesystem/pom.xml
index f39758b..12a151e 100644
--- a/flink-connectors/flink-connector-filesystem/pom.xml
+++ b/flink-connectors/flink-connector-filesystem/pom.xml
@@ -57,6 +57,15 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+			<!-- Projects depending on this project, won't depend on flink-avro. -->
+			<optional>true</optional>
+		</dependency>
+
 		<!-- test dependencies -->
 
 		<dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
index 317ee55..e931633 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
@@ -65,7 +65,7 @@ Usage:
 }
 </pre>
 */
-public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>>  implements Writer<Tuple2<K, V>>, InputTypeConfigurable {
+public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>> implements Writer<Tuple2<K, V>>, InputTypeConfigurable {
 	private static final long serialVersionUID = 1L;
 	public static final String CONF_OUTPUT_KEY_SCHEMA = "avro.schema.output.key";
 	public static final String CONF_OUTPUT_VALUE_SCHEMA = "avro.schema.output.value";

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml
index 581640d..2b6660d 100644
--- a/flink-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml
@@ -78,8 +78,16 @@ under the License.
 			<artifactId>flink-table_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
-			<!-- Projects depending on this project,
-			won't depend on flink-table. -->
+			<!-- Projects depending on this project, won't depend on flink-table. -->
+			<optional>true</optional>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+			<!-- Projects depending on this project, won't depend on flink-avro. -->
 			<optional>true</optional>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-0.11/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/pom.xml b/flink-connectors/flink-connector-kafka-0.11/pom.xml
index 475c842..162d5d0 100644
--- a/flink-connectors/flink-connector-kafka-0.11/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.11/pom.xml
@@ -78,8 +78,16 @@ under the License.
 			<artifactId>flink-table_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
-			<!-- Projects depending on this project,
-			won't depend on flink-table. -->
+			<!-- Projects depending on this project, won't depend on flink-table. -->
+			<optional>true</optional>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+			<!-- Projects depending on this project, won't depend on flink-avro. -->
 			<optional>true</optional>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-0.8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/pom.xml b/flink-connectors/flink-connector-kafka-0.8/pom.xml
index dd7a542..c990188 100644
--- a/flink-connectors/flink-connector-kafka-0.8/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.8/pom.xml
@@ -69,8 +69,16 @@ under the License.
 			<artifactId>flink-table_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
-			<!-- Projects depending on this project,
-			won't depend on flink-table. -->
+			<!-- Projects depending on this project, won't depend on flink-table. -->
+			<optional>true</optional>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+			<!-- Projects depending on this project, won't depend on flink-avro. -->
 			<optional>true</optional>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-0.9/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/pom.xml b/flink-connectors/flink-connector-kafka-0.9/pom.xml
index fef070d..819d590 100644
--- a/flink-connectors/flink-connector-kafka-0.9/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.9/pom.xml
@@ -68,8 +68,16 @@ under the License.
 			<artifactId>flink-table_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
-			<!-- Projects depending on this project,
-			won't depend on flink-table. -->
+			<!-- Projects depending on this project, won't depend on flink-table. -->
+			<optional>true</optional>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+			<!-- Projects depending on this project, won't depend on flink-avro. -->
 			<optional>true</optional>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/pom.xml b/flink-connectors/flink-connector-kafka-base/pom.xml
index c9f7de2..4f2fb45 100644
--- a/flink-connectors/flink-connector-kafka-base/pom.xml
+++ b/flink-connectors/flink-connector-kafka-base/pom.xml
@@ -62,8 +62,16 @@ under the License.
 			<artifactId>flink-table_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
-			<!-- Projects depending on this project,
-			won't depend on flink-table. -->
+			<!-- Projects depending on this project, won't depend on flink-table. -->
+			<optional>true</optional>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+			<!-- Projects depending on this project, won't depend on flink-avro. -->
 			<optional>true</optional>
 		</dependency>
 
@@ -218,25 +226,6 @@ under the License.
 				<inherited>true</inherited>
 				<extensions>true</extensions>
 			</plugin>
-			<!-- Add Avro generated classes for testing. -->
-			<plugin>
-				<groupId>org.codehaus.mojo</groupId>
-				<artifactId>build-helper-maven-plugin</artifactId>
-				<version>1.7</version>
-				<executions>
-					<execution>
-						<phase>generate-test-sources</phase>
-						<goals>
-							<goal>add-test-source</goal>
-						</goals>
-						<configuration>
-							<sources>
-								<source>${project.basedir}/../flink-avro/src/test/java/org/apache/flink/api/io/avro/generated</source>
-							</sources>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
 		</plugins>
 	</build>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
deleted file mode 100644
index 0d36f4c..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.Decoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.specific.SpecificData;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.util.Utf8;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.List;
-
-/**
- * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}.
- *
- * <p>Deserializes the <code>byte[]</code> messages into (nested) Flink Rows.
- *
- * {@link Utf8} is converted to regular Java Strings.
- */
-public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<Row> {
-
-	/**
-	 * Avro record class.
-	 */
-	private Class<? extends SpecificRecord> recordClazz;
-
-	/**
-	 * Schema for deterministic field order.
-	 */
-	private transient Schema schema;
-
-	/**
-	 * Reader that deserializes byte array into a record.
-	 */
-	private transient DatumReader<SpecificRecord> datumReader;
-
-	/**
-	 * Input stream to read message from.
-	 */
-	private transient MutableByteArrayInputStream inputStream;
-
-	/**
-	 * Avro decoder that decodes binary data.
-	 */
-	private transient Decoder decoder;
-
-	/**
-	 * Record to deserialize byte array to.
-	 */
-	private SpecificRecord record;
-
-	/**
-	 * Creates a Avro deserialization schema for the given record.
-	 *
-	 * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
-	 */
-	public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) {
-		Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
-		this.recordClazz = recordClazz;
-		this.schema = SpecificData.get().getSchema(recordClazz);
-		this.datumReader = new SpecificDatumReader<>(schema);
-		this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema);
-		this.inputStream = new MutableByteArrayInputStream();
-		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
-	}
-
-	@Override
-	public Row deserialize(byte[] message) throws IOException {
-		// read record
-		try {
-			inputStream.setBuffer(message);
-			this.record = datumReader.read(record, decoder);
-		} catch (IOException e) {
-			throw new RuntimeException("Failed to deserialize Row.", e);
-		}
-
-		// convert to row
-		final Object row = convertToRow(schema, record);
-		return (Row) row;
-	}
-
-	private void writeObject(ObjectOutputStream oos) throws IOException {
-		oos.writeObject(recordClazz);
-	}
-
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
-		this.recordClazz = (Class<? extends SpecificRecord>) ois.readObject();
-		this.schema = SpecificData.get().getSchema(recordClazz);
-		this.datumReader = new SpecificDatumReader<>(schema);
-		this.record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema);
-		this.inputStream = new MutableByteArrayInputStream();
-		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
-	}
-
-	/**
-	 * Converts a (nested) Avro {@link SpecificRecord} into Flink's Row type.
-	 * Avro's {@link Utf8} fields are converted into regular Java strings.
-	 */
-	private static Object convertToRow(Schema schema, Object recordObj) {
-		if (recordObj instanceof GenericRecord) {
-			// records can be wrapped in a union
-			if (schema.getType() == Schema.Type.UNION) {
-				final List<Schema> types = schema.getTypes();
-				if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
-					schema = types.get(1);
-				}
-				else {
-					throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD]. Given: " + schema);
-				}
-			} else if (schema.getType() != Schema.Type.RECORD) {
-				throw new RuntimeException("Record type for row type expected. But is: " + schema);
-			}
-			final List<Schema.Field> fields = schema.getFields();
-			final Row row = new Row(fields.size());
-			final GenericRecord record = (GenericRecord) recordObj;
-			for (int i = 0; i < fields.size(); i++) {
-				final Schema.Field field = fields.get(i);
-				row.setField(i, convertToRow(field.schema(), record.get(field.pos())));
-			}
-			return row;
-		} else if (recordObj instanceof Utf8) {
-			return recordObj.toString();
-		} else {
-			return recordObj;
-		}
-	}
-
-	/**
-	 * An extension of the ByteArrayInputStream that allows to change a buffer that should be
-	 * read without creating a new ByteArrayInputStream instance. This allows to re-use the same
-	 * InputStream instance, copying message to process, and creation of Decoder on every new message.
-	 */
-	private static final class MutableByteArrayInputStream extends ByteArrayInputStream {
-
-		public MutableByteArrayInputStream() {
-			super(new byte[0]);
-		}
-
-		/**
-		 * Set buffer that can be read via the InputStream interface and reset the input stream.
-		 * This has the same effect as creating a new ByteArrayInputStream with a new buffer.
-		 *
-		 * @param buf the new buffer to read.
-		 */
-		public void setBuffer(byte[] buf) {
-			this.buf = buf;
-			this.pos = 0;
-			this.count = buf.length;
-		}
-	}
-}