You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/07/03 15:17:33 UTC

[2/3] flink git commit: [FLINK-9444] [formats] Add full SQL support for Avro formats

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
index 92d2c31..84849a6 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.formats.avro.generated.Address;
 import org.apache.flink.formats.avro.generated.Colors;
+import org.apache.flink.formats.avro.generated.Fixed2;
 import org.apache.flink.formats.avro.generated.User;
 import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
 import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils;
@@ -47,6 +48,9 @@ import org.apache.avro.io.DatumWriter;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -55,6 +59,8 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -100,15 +106,15 @@ public class AvroRecordInputFormatTest {
 	private Schema userSchema = new User().getSchema();
 
 	public static void writeTestFile(File testFile) throws IOException {
-		ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>();
+		ArrayList<CharSequence> stringArray = new ArrayList<>();
 		stringArray.add(TEST_ARRAY_STRING_1);
 		stringArray.add(TEST_ARRAY_STRING_2);
 
-		ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
+		ArrayList<Boolean> booleanArray = new ArrayList<>();
 		booleanArray.add(TEST_ARRAY_BOOLEAN_1);
 		booleanArray.add(TEST_ARRAY_BOOLEAN_2);
 
-		HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>();
+		HashMap<CharSequence, Long> longMap = new HashMap<>();
 		longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
 		longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
 
@@ -130,6 +136,16 @@ public class AvroRecordInputFormatTest {
 		user1.setTypeEnum(TEST_ENUM_COLOR);
 		user1.setTypeMap(longMap);
 		user1.setTypeNested(addr);
+		user1.setTypeBytes(ByteBuffer.allocate(10));
+		user1.setTypeDate(LocalDate.parse("2014-03-01"));
+		user1.setTypeTimeMillis(LocalTime.parse("12:12:12"));
+		user1.setTypeTimeMicros(123456);
+		user1.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
+		user1.setTypeTimestampMicros(123456L);
+		// 20.00
+		user1.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+		// 20.00
+		user1.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
 
 		// Construct via builder
 		User user2 = User.newBuilder()
@@ -140,20 +156,30 @@ public class AvroRecordInputFormatTest {
 				.setTypeDoubleTest(1.337d)
 				.setTypeNullTest(null)
 				.setTypeLongTest(1337L)
-				.setTypeArrayString(new ArrayList<CharSequence>())
-				.setTypeArrayBoolean(new ArrayList<Boolean>())
+				.setTypeArrayString(new ArrayList<>())
+				.setTypeArrayBoolean(new ArrayList<>())
 				.setTypeNullableArray(null)
 				.setTypeEnum(Colors.RED)
-				.setTypeMap(new HashMap<CharSequence, Long>())
+				.setTypeMap(new HashMap<>())
 				.setTypeFixed(null)
 				.setTypeUnion(null)
 				.setTypeNested(
 						Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET)
 								.setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP)
 								.build())
+				.setTypeBytes(ByteBuffer.allocate(10))
+				.setTypeDate(LocalDate.parse("2014-03-01"))
+				.setTypeTimeMillis(LocalTime.parse("12:12:12"))
+				.setTypeTimeMicros(123456)
+				.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
+				.setTypeTimestampMicros(123456L)
+				// 20.00
+				.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
+				// 20.00
+				.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
 				.build();
-		DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
-		DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
+		DatumWriter<User> userDatumWriter = new SpecificDatumWriter<>(User.class);
+		DataFileWriter<User> dataFileWriter = new DataFileWriter<>(userDatumWriter);
 		dataFileWriter.create(user1.getSchema(), testFile);
 		dataFileWriter.append(user1);
 		dataFileWriter.append(user2);
@@ -167,14 +193,13 @@ public class AvroRecordInputFormatTest {
 	}
 
 	/**
-	 * Test if the AvroInputFormat is able to properly read data from an avro file.
-	 * @throws IOException
+	 * Test if the AvroInputFormat is able to properly read data from an Avro file.
 	 */
 	@Test
-	public void testDeserialisation() throws IOException {
+	public void testDeserialization() throws IOException {
 		Configuration parameters = new Configuration();
 
-		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+		AvroInputFormat<User> format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class);
 
 		format.configure(parameters);
 		FileInputSplit[] splits = format.createInputSplits(1);
@@ -216,14 +241,13 @@ public class AvroRecordInputFormatTest {
 	}
 
 	/**
-	 * Test if the AvroInputFormat is able to properly read data from an avro file.
-	 * @throws IOException
+	 * Test if the AvroInputFormat is able to properly read data from an Avro file.
 	 */
 	@Test
-	public void testDeserialisationReuseAvroRecordFalse() throws IOException {
+	public void testDeserializationReuseAvroRecordFalse() throws IOException {
 		Configuration parameters = new Configuration();
 
-		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+		AvroInputFormat<User> format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class);
 		format.setReuseAvroValue(false);
 
 		format.configure(parameters);
@@ -294,7 +318,7 @@ public class AvroRecordInputFormatTest {
 			ExecutionConfig ec = new ExecutionConfig();
 			assertEquals(GenericTypeInfo.class, te.getClass());
 
-			Serializers.recursivelyRegisterType(te.getTypeClass(), ec, new HashSet<Class<?>>());
+			Serializers.recursivelyRegisterType(te.getTypeClass(), ec, new HashSet<>());
 
 			TypeSerializer<GenericData.Record> tser = te.createSerializer(ec);
 			assertEquals(1, ec.getDefaultKryoSerializerClasses().size());
@@ -327,7 +351,7 @@ public class AvroRecordInputFormatTest {
 	@Test
 	public void testDeserializeToSpecificType() throws IOException {
 
-		DatumReader<User> datumReader = new SpecificDatumReader<User>(userSchema);
+		DatumReader<User> datumReader = new SpecificDatumReader<>(userSchema);
 
 		try (FileReader<User> dataFileReader = DataFileReader.openReader(testFile, datumReader)) {
 			User rec = dataFileReader.next();
@@ -365,15 +389,12 @@ public class AvroRecordInputFormatTest {
 	/**
 	 * Test if the AvroInputFormat is able to properly read data from an Avro
 	 * file as a GenericRecord.
-	 *
-	 * @throws IOException
 	 */
 	@Test
-	public void testDeserialisationGenericRecord() throws IOException {
+	public void testDeserializationGenericRecord() throws IOException {
 		Configuration parameters = new Configuration();
 
-		AvroInputFormat<GenericRecord> format = new AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()),
-				GenericRecord.class);
+		AvroInputFormat<GenericRecord> format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), GenericRecord.class);
 
 		doTestDeserializationGenericRecord(format, parameters);
 	}
@@ -440,17 +461,17 @@ public class AvroRecordInputFormatTest {
 	 * @throws IOException if there is an error
 	 */
 	@Test
-	public void testDeserialisationGenericRecordReuseAvroValueFalse() throws IOException {
+	public void testDeserializationGenericRecordReuseAvroValueFalse() throws IOException {
 		Configuration parameters = new Configuration();
 
-		AvroInputFormat<GenericRecord> format = new AvroInputFormat<GenericRecord>(new Path(testFile.getAbsolutePath()),
-				GenericRecord.class);
+		AvroInputFormat<GenericRecord> format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), GenericRecord.class);
 		format.configure(parameters);
 		format.setReuseAvroValue(false);
 
 		doTestDeserializationGenericRecord(format, parameters);
 	}
 
+	@SuppressWarnings("ResultOfMethodCallIgnored")
 	@After
 	public void deleteFiles() {
 		testFile.delete();

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
index 1d98c14..de50f27 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
@@ -23,8 +23,9 @@ import org.apache.flink.formats.avro.utils.AvroTestUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.InstantiationUtil;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -37,8 +38,8 @@ import static org.junit.Assert.assertEquals;
 public class AvroRowDeSerializationSchemaTest {
 
 	@Test
-	public void testSerializeDeserializeSimpleRow() throws IOException {
-		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
+	public void testSpecificSerializeDeserializeFromClass() throws IOException {
+		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData();
 
 		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
 		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
@@ -50,14 +51,13 @@ public class AvroRowDeSerializationSchemaTest {
 	}
 
 	@Test
-	public void testSerializeSimpleRowSeveralTimes() throws IOException {
-		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
+	public void testSpecificSerializeDeserializeFromSchema() throws IOException {
+		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData();
+		final String schemaString = testData.f1.getSchema().toString();
 
-		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
-		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
+		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(schemaString);
+		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(schemaString);
 
-		serializationSchema.serialize(testData.f2);
-		serializationSchema.serialize(testData.f2);
 		final byte[] bytes = serializationSchema.serialize(testData.f2);
 		final Row actual = deserializationSchema.deserialize(bytes);
 
@@ -65,27 +65,27 @@ public class AvroRowDeSerializationSchemaTest {
 	}
 
 	@Test
-	public void testDeserializeRowSeveralTimes() throws IOException {
-		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
+	public void testGenericSerializeDeserialize() throws IOException {
+		final Tuple3<GenericRecord, Row, Schema> testData = AvroTestUtils.getGenericTestData();
 
-		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
-		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
+		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f2.toString());
+		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f2.toString());
 
-		final byte[] bytes = serializationSchema.serialize(testData.f2);
-		deserializationSchema.deserialize(bytes);
-		deserializationSchema.deserialize(bytes);
+		final byte[] bytes = serializationSchema.serialize(testData.f1);
 		final Row actual = deserializationSchema.deserialize(bytes);
 
-		assertEquals(testData.f2, actual);
+		assertEquals(testData.f1, actual);
 	}
 
 	@Test
-	public void testSerializeDeserializeComplexRow() throws IOException {
-		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
+	public void testSpecificSerializeFromClassSeveralTimes() throws IOException {
+		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData();
 
 		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
 		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
 
+		serializationSchema.serialize(testData.f2);
+		serializationSchema.serialize(testData.f2);
 		final byte[] bytes = serializationSchema.serialize(testData.f2);
 		final Row actual = deserializationSchema.deserialize(bytes);
 
@@ -93,11 +93,12 @@ public class AvroRowDeSerializationSchemaTest {
 	}
 
 	@Test
-	public void testSerializeComplexRowSeveralTimes() throws IOException {
-		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
+	public void testSpecificSerializeFromSchemaSeveralTimes() throws IOException {
+		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData();
+		final String schemaString = testData.f1.getSchema().toString();
 
-		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
-		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
+		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(schemaString);
+		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(schemaString);
 
 		serializationSchema.serialize(testData.f2);
 		serializationSchema.serialize(testData.f2);
@@ -108,8 +109,23 @@ public class AvroRowDeSerializationSchemaTest {
 	}
 
 	@Test
-	public void testDeserializeComplexRowSeveralTimes() throws IOException {
-		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
+	public void testGenericSerializeSeveralTimes() throws IOException {
+		final Tuple3<GenericRecord, Row, Schema> testData = AvroTestUtils.getGenericTestData();
+
+		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f2.toString());
+		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f2.toString());
+
+		serializationSchema.serialize(testData.f1);
+		serializationSchema.serialize(testData.f1);
+		final byte[] bytes = serializationSchema.serialize(testData.f1);
+		final Row actual = deserializationSchema.deserialize(bytes);
+
+		assertEquals(testData.f1, actual);
+	}
+
+	@Test
+	public void testSpecificDeserializeFromClassSeveralTimes() throws IOException {
+		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData();
 
 		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
 		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
@@ -123,25 +139,66 @@ public class AvroRowDeSerializationSchemaTest {
 	}
 
 	@Test
-	public void testSerializability() throws IOException, ClassNotFoundException {
-		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
+	public void testSpecificDeserializeFromSchemaSeveralTimes() throws IOException {
+		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData();
+		final String schemaString = testData.f1.getSchema().toString();
 
-		final AvroRowSerializationSchema serOrig = new AvroRowSerializationSchema(testData.f0);
-		final AvroRowDeserializationSchema deserOrig = new AvroRowDeserializationSchema(testData.f0);
+		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(schemaString);
+		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(schemaString);
 
-		byte[] serBytes = InstantiationUtil.serializeObject(serOrig);
-		byte[] deserBytes = InstantiationUtil.serializeObject(deserOrig);
+		final byte[] bytes = serializationSchema.serialize(testData.f2);
+		deserializationSchema.deserialize(bytes);
+		deserializationSchema.deserialize(bytes);
+		final Row actual = deserializationSchema.deserialize(bytes);
 
-		AvroRowSerializationSchema serCopy =
+		assertEquals(testData.f2, actual);
+	}
+
+	@Test
+	public void testGenericDeserializeSeveralTimes() throws IOException {
+		final Tuple3<GenericRecord, Row, Schema> testData = AvroTestUtils.getGenericTestData();
+
+		final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f2.toString());
+		final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f2.toString());
+
+		final byte[] bytes = serializationSchema.serialize(testData.f1);
+		deserializationSchema.deserialize(bytes);
+		deserializationSchema.deserialize(bytes);
+		final Row actual = deserializationSchema.deserialize(bytes);
+
+		assertEquals(testData.f1, actual);
+	}
+
+	@Test
+	public void testSerializability() throws Exception {
+		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData();
+		final String schemaString = testData.f1.getSchema().toString();
+
+		// from class
+		final AvroRowSerializationSchema classSer = new AvroRowSerializationSchema(testData.f0);
+		final AvroRowDeserializationSchema classDeser = new AvroRowDeserializationSchema(testData.f0);
+		testSerializability(classSer, classDeser, testData.f2);
+
+		// from schema string
+		final AvroRowSerializationSchema schemaSer = new AvroRowSerializationSchema(schemaString);
+		final AvroRowDeserializationSchema schemaDeser = new AvroRowDeserializationSchema(schemaString);
+		testSerializability(schemaSer, schemaDeser, testData.f2);
+	}
+
+	private void testSerializability(AvroRowSerializationSchema ser, AvroRowDeserializationSchema deser, Row data) throws Exception {
+		final byte[] serBytes = InstantiationUtil.serializeObject(ser);
+		final byte[] deserBytes = InstantiationUtil.serializeObject(deser);
+
+		final AvroRowSerializationSchema serCopy =
 			InstantiationUtil.deserializeObject(serBytes, Thread.currentThread().getContextClassLoader());
-		AvroRowDeserializationSchema deserCopy =
+		final AvroRowDeserializationSchema deserCopy =
 			InstantiationUtil.deserializeObject(deserBytes, Thread.currentThread().getContextClassLoader());
 
-		final byte[] bytes = serCopy.serialize(testData.f2);
+		final byte[] bytes = serCopy.serialize(data);
 		deserCopy.deserialize(bytes);
 		deserCopy.deserialize(bytes);
 		final Row actual = deserCopy.deserialize(bytes);
 
-		assertEquals(testData.f2, actual);
+		assertEquals(data, actual);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
index 40a84f9..fee81a8 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
@@ -25,11 +25,15 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.avro.generated.Address;
 import org.apache.flink.formats.avro.generated.Colors;
 import org.apache.flink.formats.avro.generated.Fixed16;
+import org.apache.flink.formats.avro.generated.Fixed2;
 import org.apache.flink.formats.avro.generated.User;
 
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.specific.SpecificDatumWriter;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -37,6 +41,8 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Random;
@@ -67,7 +73,7 @@ public class AvroSplittableInputFormatTest {
 	static final String TEST_MAP_KEY2 = "KEY 2";
 	static final long TEST_MAP_VALUE2 = 17554L;
 
-	static final Integer TEST_NUM = new Integer(239);
+	static final Integer TEST_NUM = 239;
 	static final String TEST_STREET = "Baker Street";
 	static final String TEST_CITY = "London";
 	static final String TEST_STATE = "London";
@@ -79,20 +85,20 @@ public class AvroSplittableInputFormatTest {
 	public void createFiles() throws IOException {
 		testFile = File.createTempFile("AvroSplittableInputFormatTest", null);
 
-		ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>();
+		ArrayList<CharSequence> stringArray = new ArrayList<>();
 		stringArray.add(TEST_ARRAY_STRING_1);
 		stringArray.add(TEST_ARRAY_STRING_2);
 
-		ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
+		ArrayList<Boolean> booleanArray = new ArrayList<>();
 		booleanArray.add(TEST_ARRAY_BOOLEAN_1);
 		booleanArray.add(TEST_ARRAY_BOOLEAN_2);
 
-		HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>();
+		HashMap<CharSequence, Long> longMap = new HashMap<>();
 		longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
 		longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
 
 		Address addr = new Address();
-		addr.setNum(new Integer(TEST_NUM));
+		addr.setNum(TEST_NUM);
 		addr.setStreet(TEST_STREET);
 		addr.setCity(TEST_CITY);
 		addr.setState(TEST_STATE);
@@ -108,6 +114,16 @@ public class AvroSplittableInputFormatTest {
 		user1.setTypeEnum(TEST_ENUM_COLOR);
 		user1.setTypeMap(longMap);
 		user1.setTypeNested(addr);
+		user1.setTypeBytes(ByteBuffer.allocate(10));
+		user1.setTypeDate(LocalDate.parse("2014-03-01"));
+		user1.setTypeTimeMillis(LocalTime.parse("12:12:12"));
+		user1.setTypeTimeMicros(123456);
+		user1.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
+		user1.setTypeTimestampMicros(123456L);
+		// 20.00
+		user1.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+		// 20.00
+		user1.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
 
 		// Construct via builder
 		User user2 = User.newBuilder()
@@ -118,20 +134,30 @@ public class AvroSplittableInputFormatTest {
 				.setTypeDoubleTest(1.337d)
 				.setTypeNullTest(null)
 				.setTypeLongTest(1337L)
-				.setTypeArrayString(new ArrayList<CharSequence>())
-				.setTypeArrayBoolean(new ArrayList<Boolean>())
+				.setTypeArrayString(new ArrayList<>())
+				.setTypeArrayBoolean(new ArrayList<>())
 				.setTypeNullableArray(null)
 				.setTypeEnum(Colors.RED)
-				.setTypeMap(new HashMap<CharSequence, Long>())
+				.setTypeMap(new HashMap<>())
 				.setTypeFixed(new Fixed16())
 				.setTypeUnion(123L)
 				.setTypeNested(
 						Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET)
 								.setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP)
 								.build())
+				.setTypeBytes(ByteBuffer.allocate(10))
+				.setTypeDate(LocalDate.parse("2014-03-01"))
+				.setTypeTimeMillis(LocalTime.parse("12:12:12"))
+				.setTypeTimeMicros(123456)
+				.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
+				.setTypeTimestampMicros(123456L)
+				// 20.00
+				.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
+				// 20.00
+				.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
 				.build();
-		DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
-		DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
+		DatumWriter<User> userDatumWriter = new SpecificDatumWriter<>(User.class);
+		DataFileWriter<User> dataFileWriter = new DataFileWriter<>(userDatumWriter);
 		dataFileWriter.create(user1.getSchema(), testFile);
 		dataFileWriter.append(user1);
 		dataFileWriter.append(user2);
@@ -148,12 +174,22 @@ public class AvroSplittableInputFormatTest {
 			user.setTypeEnum(TEST_ENUM_COLOR);
 			user.setTypeMap(longMap);
 			Address address = new Address();
-			address.setNum(new Integer(TEST_NUM));
+			address.setNum(TEST_NUM);
 			address.setStreet(TEST_STREET);
 			address.setCity(TEST_CITY);
 			address.setState(TEST_STATE);
 			address.setZip(TEST_ZIP);
 			user.setTypeNested(address);
+			user.setTypeBytes(ByteBuffer.allocate(10));
+			user.setTypeDate(LocalDate.parse("2014-03-01"));
+			user.setTypeTimeMillis(LocalTime.parse("12:12:12"));
+			user.setTypeTimeMicros(123456);
+			user.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"));
+			user.setTypeTimestampMicros(123456L);
+			// 20.00
+			user.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+			// 20.00
+			user.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
 
 			dataFileWriter.append(user);
 		}
@@ -164,7 +200,7 @@ public class AvroSplittableInputFormatTest {
 	public void testSplittedIF() throws IOException {
 		Configuration parameters = new Configuration();
 
-		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+		AvroInputFormat<User> format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class);
 
 		format.configure(parameters);
 		FileInputSplit[] splits = format.createInputSplits(4);
@@ -182,10 +218,10 @@ public class AvroSplittableInputFormatTest {
 			format.close();
 		}
 
-		Assert.assertEquals(1539, elementsPerSplit[0]);
-		Assert.assertEquals(1026, elementsPerSplit[1]);
-		Assert.assertEquals(1539, elementsPerSplit[2]);
-		Assert.assertEquals(896, elementsPerSplit[3]);
+		Assert.assertEquals(1604, elementsPerSplit[0]);
+		Assert.assertEquals(1203, elementsPerSplit[1]);
+		Assert.assertEquals(1203, elementsPerSplit[2]);
+		Assert.assertEquals(990, elementsPerSplit[3]);
 		Assert.assertEquals(NUM_RECORDS, elements);
 		format.close();
 	}
@@ -196,7 +232,7 @@ public class AvroSplittableInputFormatTest {
 
 		Configuration parameters = new Configuration();
 
-		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+		AvroInputFormat<User> format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class);
 		format.configure(parameters);
 
 		FileInputSplit[] splits = format.createInputSplits(4);
@@ -228,10 +264,10 @@ public class AvroSplittableInputFormatTest {
 			format.close();
 		}
 
-		Assert.assertEquals(1539, elementsPerSplit[0]);
-		Assert.assertEquals(1026, elementsPerSplit[1]);
-		Assert.assertEquals(1539, elementsPerSplit[2]);
-		Assert.assertEquals(896, elementsPerSplit[3]);
+		Assert.assertEquals(1604, elementsPerSplit[0]);
+		Assert.assertEquals(1203, elementsPerSplit[1]);
+		Assert.assertEquals(1203, elementsPerSplit[2]);
+		Assert.assertEquals(990, elementsPerSplit[3]);
 		Assert.assertEquals(NUM_RECORDS, elements);
 		format.close();
 	}
@@ -242,7 +278,7 @@ public class AvroSplittableInputFormatTest {
 
 		Configuration parameters = new Configuration();
 
-		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+		AvroInputFormat<User> format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class);
 		format.configure(parameters);
 
 		FileInputSplit[] splits = format.createInputSplits(4);
@@ -274,10 +310,10 @@ public class AvroSplittableInputFormatTest {
 			format.close();
 		}
 
-		Assert.assertEquals(1539, elementsPerSplit[0]);
-		Assert.assertEquals(1026, elementsPerSplit[1]);
-		Assert.assertEquals(1539, elementsPerSplit[2]);
-		Assert.assertEquals(896, elementsPerSplit[3]);
+		Assert.assertEquals(1604, elementsPerSplit[0]);
+		Assert.assertEquals(1203, elementsPerSplit[1]);
+		Assert.assertEquals(1203, elementsPerSplit[2]);
+		Assert.assertEquals(990, elementsPerSplit[3]);
 		Assert.assertEquals(NUM_RECORDS, elements);
 		format.close();
 	}
@@ -287,11 +323,23 @@ public class AvroSplittableInputFormatTest {
 
 	This dependency needs to be added
 
-        <dependency>
-            <groupId>org.apache.avro</groupId>
-            <artifactId>avro-mapred</artifactId>
-            <version>1.7.6</version>
-        </dependency>
+		<dependency>
+			<groupId>org.apache.avro</groupId>
+			<artifactId>avro-mapred</artifactId>
+			<version>1.7.6</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-hadoop-compatibility_2.11</artifactId>
+			<version>1.6-SNAPSHOT</version>
+		</dependency>
+
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>16.0</version>
+		</dependency>
 
 	@Test
 	public void testHadoop() throws Exception {
@@ -314,10 +362,11 @@ public class AvroSplittableInputFormatTest {
 			}
 			i++;
 		}
-		System.out.println("Status "+Arrays.toString(elementsPerSplit));
-	} **/
+		System.out.println("Status " + Arrays.toString(elementsPerSplit));
+	} */
 
 	@After
+	@SuppressWarnings("ResultOfMethodCallIgnored")
 	public void deleteFiles() {
 		testFile.delete();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
index 87e169b..49ef985 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.formats.avro;
 import org.apache.flink.formats.avro.generated.Address;
 import org.apache.flink.formats.avro.generated.Colors;
 import org.apache.flink.formats.avro.generated.Fixed16;
+import org.apache.flink.formats.avro.generated.Fixed2;
 import org.apache.flink.formats.avro.generated.User;
 import org.apache.flink.formats.avro.utils.DataInputDecoder;
 import org.apache.flink.formats.avro.utils.DataOutputEncoder;
@@ -28,12 +29,17 @@ import org.apache.flink.util.StringUtils;
 
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -49,6 +55,7 @@ import static org.junit.Assert.fail;
  * Tests the {@link DataOutputEncoder} and {@link DataInputDecoder} classes for Avro serialization.
  */
 public class EncoderDecoderTest {
+
 	@Test
 	public void testComplexStringsDirecty() {
 		try {
@@ -93,56 +100,56 @@ public class EncoderDecoderTest {
 	@Test
 	public void testPrimitiveTypes() {
 
-		testObjectSerialization(new Boolean(true));
-		testObjectSerialization(new Boolean(false));
-
-		testObjectSerialization(Byte.valueOf((byte) 0));
-		testObjectSerialization(Byte.valueOf((byte) 1));
-		testObjectSerialization(Byte.valueOf((byte) -1));
-		testObjectSerialization(Byte.valueOf(Byte.MIN_VALUE));
-		testObjectSerialization(Byte.valueOf(Byte.MAX_VALUE));
-
-		testObjectSerialization(Short.valueOf((short) 0));
-		testObjectSerialization(Short.valueOf((short) 1));
-		testObjectSerialization(Short.valueOf((short) -1));
-		testObjectSerialization(Short.valueOf(Short.MIN_VALUE));
-		testObjectSerialization(Short.valueOf(Short.MAX_VALUE));
-
-		testObjectSerialization(Integer.valueOf(0));
-		testObjectSerialization(Integer.valueOf(1));
-		testObjectSerialization(Integer.valueOf(-1));
-		testObjectSerialization(Integer.valueOf(Integer.MIN_VALUE));
-		testObjectSerialization(Integer.valueOf(Integer.MAX_VALUE));
-
-		testObjectSerialization(Long.valueOf(0));
-		testObjectSerialization(Long.valueOf(1));
-		testObjectSerialization(Long.valueOf(-1));
-		testObjectSerialization(Long.valueOf(Long.MIN_VALUE));
-		testObjectSerialization(Long.valueOf(Long.MAX_VALUE));
-
-		testObjectSerialization(Float.valueOf(0));
-		testObjectSerialization(Float.valueOf(1));
-		testObjectSerialization(Float.valueOf(-1));
-		testObjectSerialization(Float.valueOf((float) Math.E));
-		testObjectSerialization(Float.valueOf((float) Math.PI));
-		testObjectSerialization(Float.valueOf(Float.MIN_VALUE));
-		testObjectSerialization(Float.valueOf(Float.MAX_VALUE));
-		testObjectSerialization(Float.valueOf(Float.MIN_NORMAL));
-		testObjectSerialization(Float.valueOf(Float.NaN));
-		testObjectSerialization(Float.valueOf(Float.NEGATIVE_INFINITY));
-		testObjectSerialization(Float.valueOf(Float.POSITIVE_INFINITY));
-
-		testObjectSerialization(Double.valueOf(0));
-		testObjectSerialization(Double.valueOf(1));
-		testObjectSerialization(Double.valueOf(-1));
-		testObjectSerialization(Double.valueOf(Math.E));
-		testObjectSerialization(Double.valueOf(Math.PI));
-		testObjectSerialization(Double.valueOf(Double.MIN_VALUE));
-		testObjectSerialization(Double.valueOf(Double.MAX_VALUE));
-		testObjectSerialization(Double.valueOf(Double.MIN_NORMAL));
-		testObjectSerialization(Double.valueOf(Double.NaN));
-		testObjectSerialization(Double.valueOf(Double.NEGATIVE_INFINITY));
-		testObjectSerialization(Double.valueOf(Double.POSITIVE_INFINITY));
+		testObjectSerialization(Boolean.TRUE);
+		testObjectSerialization(Boolean.FALSE);
+
+		testObjectSerialization((byte) 0);
+		testObjectSerialization((byte) 1);
+		testObjectSerialization((byte) -1);
+		testObjectSerialization(Byte.MIN_VALUE);
+		testObjectSerialization(Byte.MAX_VALUE);
+
+		testObjectSerialization((short) 0);
+		testObjectSerialization((short) 1);
+		testObjectSerialization((short) -1);
+		testObjectSerialization(Short.MIN_VALUE);
+		testObjectSerialization(Short.MAX_VALUE);
+
+		testObjectSerialization(0);
+		testObjectSerialization(1);
+		testObjectSerialization(-1);
+		testObjectSerialization(Integer.MIN_VALUE);
+		testObjectSerialization(Integer.MAX_VALUE);
+
+		testObjectSerialization(0L);
+		testObjectSerialization(1L);
+		testObjectSerialization((long) -1);
+		testObjectSerialization(Long.MIN_VALUE);
+		testObjectSerialization(Long.MAX_VALUE);
+
+		testObjectSerialization(0f);
+		testObjectSerialization(1f);
+		testObjectSerialization((float) -1);
+		testObjectSerialization((float) Math.E);
+		testObjectSerialization((float) Math.PI);
+		testObjectSerialization(Float.MIN_VALUE);
+		testObjectSerialization(Float.MAX_VALUE);
+		testObjectSerialization(Float.MIN_NORMAL);
+		testObjectSerialization(Float.NaN);
+		testObjectSerialization(Float.NEGATIVE_INFINITY);
+		testObjectSerialization(Float.POSITIVE_INFINITY);
+
+		testObjectSerialization(0d);
+		testObjectSerialization(1d);
+		testObjectSerialization((double) -1);
+		testObjectSerialization(Math.E);
+		testObjectSerialization(Math.PI);
+		testObjectSerialization(Double.MIN_VALUE);
+		testObjectSerialization(Double.MAX_VALUE);
+		testObjectSerialization(Double.MIN_NORMAL);
+		testObjectSerialization(Double.NaN);
+		testObjectSerialization(Double.NEGATIVE_INFINITY);
+		testObjectSerialization(Double.POSITIVE_INFINITY);
 
 		testObjectSerialization("");
 		testObjectSerialization("abcdefg");
@@ -209,7 +216,7 @@ public class EncoderDecoderTest {
 
 		// object with collection
 		{
-			ArrayList<String> list = new ArrayList<String>();
+			ArrayList<String> list = new ArrayList<>();
 			list.add("A");
 			list.add("B");
 			list.add("C");
@@ -221,7 +228,7 @@ public class EncoderDecoderTest {
 
 		// object with empty collection
 		{
-			ArrayList<String> list = new ArrayList<String>();
+			ArrayList<String> list = new ArrayList<>();
 			testObjectSerialization(new BookAuthor(987654321L, list, "The Saurus"));
 		}
 	}
@@ -235,7 +242,7 @@ public class EncoderDecoderTest {
 	public void testGeneratedObjectWithNullableFields() {
 		List<CharSequence> strings = Arrays.asList(new CharSequence[] { "These", "strings", "should", "be", "recognizable", "as", "a", "meaningful", "sequence" });
 		List<Boolean> bools = Arrays.asList(true, true, false, false, true, false, true, true);
-		Map<CharSequence, Long> map = new HashMap<CharSequence, Long>();
+		Map<CharSequence, Long> map = new HashMap<>();
 		map.put("1", 1L);
 		map.put("2", 2L);
 		map.put("3", 3L);
@@ -243,11 +250,31 @@ public class EncoderDecoderTest {
 		byte[] b = new byte[16];
 		new Random().nextBytes(b);
 		Fixed16 f = new Fixed16(b);
-		Address addr = new Address(new Integer(239), "6th Main", "Bangalore",
-				"Karnataka", "560075");
-		User user = new User("Freudenreich", 1337, "macintosh gray",
-				1234567890L, 3.1415926, null, true, strings, bools, null,
-				Colors.GREEN, map, f, new Boolean(true), addr);
+		Address addr = new Address(239, "6th Main", "Bangalore", "Karnataka", "560075");
+		User user = new User(
+			"Freudenreich",
+			1337,
+			"macintosh gray",
+			1234567890L,
+			3.1415926,
+			null,
+			true,
+			strings,
+			bools,
+			null,
+			Colors.GREEN,
+			map,
+			f,
+			Boolean.TRUE,
+			addr,
+			ByteBuffer.wrap(b),
+			LocalDate.parse("2014-03-01"),
+			LocalTime.parse("12:12:12"),
+			123456,
+			DateTime.parse("2014-03-01T12:12:12.321Z"),
+			123456L,
+			ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()), // 20.00
+			new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())); // 20.00
 
 		testObjectSerialization(user);
 	}
@@ -301,7 +328,7 @@ public class EncoderDecoderTest {
 
 				@SuppressWarnings("unchecked")
 				Class<X> clazz = (Class<X>) obj.getClass();
-				ReflectDatumWriter<X> writer = new ReflectDatumWriter<X>(clazz);
+				ReflectDatumWriter<X> writer = new ReflectDatumWriter<>(clazz);
 
 				writer.write(obj, encoder);
 				dataOut.flush();
@@ -309,7 +336,7 @@ public class EncoderDecoderTest {
 			}
 
 			byte[] data = baos.toByteArray();
-			X result = null;
+			X result;
 
 			// deserialize
 			{
@@ -320,7 +347,7 @@ public class EncoderDecoderTest {
 
 				@SuppressWarnings("unchecked")
 				Class<X> clazz = (Class<X>) obj.getClass();
-				ReflectDatumReader<X> reader = new ReflectDatumReader<X>(clazz);
+				ReflectDatumReader<X> reader = new ReflectDatumReader<>(clazz);
 
 				// create a reuse object if possible, otherwise we have no reuse object
 				X reuse = null;
@@ -328,7 +355,9 @@ public class EncoderDecoderTest {
 					@SuppressWarnings("unchecked")
 					X test = (X) obj.getClass().newInstance();
 					reuse = test;
-				} catch (Throwable t) {}
+				} catch (Throwable t) {
+					// do nothing
+				}
 
 				result = reader.read(reuse, decoder);
 			}
@@ -427,7 +456,7 @@ public class EncoderDecoderTest {
 		public ComplexNestedObject1(int offInit) {
 			this.doubleValue = 6293485.6723 + offInit;
 
-			this.stringList = new ArrayList<String>();
+			this.stringList = new ArrayList<>();
 			this.stringList.add("A" + offInit);
 			this.stringList.add("somewhat" + offInit);
 			this.stringList.add("random" + offInit);
@@ -458,7 +487,7 @@ public class EncoderDecoderTest {
 		public ComplexNestedObject2(boolean init) {
 			this.longValue = 46547;
 
-			this.theMap = new HashMap<String, ComplexNestedObject1>();
+			this.theMap = new HashMap<>();
 			this.theMap.put("36354L", new ComplexNestedObject1(43546543));
 			this.theMap.put("785611L", new ComplexNestedObject1(45784568));
 			this.theMap.put("43L", new ComplexNestedObject1(9876543));

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
new file mode 100644
index 0000000..be0ddc4
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link AvroSchemaConverter}.
+ */
+public class AvroSchemaConverterTest {
+
+	@Test
+	public void testAvroClassConversion() {
+		validateUserSchema(AvroSchemaConverter.convertToTypeInfo(User.class));
+	}
+
+	@Test
+	public void testAvroSchemaConversion() {
+		final String schema = User.getClassSchema().toString(true);
+		validateUserSchema(AvroSchemaConverter.convertToTypeInfo(schema));
+	}
+
+	private void validateUserSchema(TypeInformation<?> actual) {
+		final TypeInformation<Row> address = Types.ROW_NAMED(
+			new String[]{
+				"num",
+				"street",
+				"city",
+				"state",
+				"zip"},
+			Types.INT,
+			Types.STRING,
+			Types.STRING,
+			Types.STRING,
+			Types.STRING);
+
+		final TypeInformation<Row> user = Types.ROW_NAMED(
+			new String[] {
+				"name",
+				"favorite_number",
+				"favorite_color",
+				"type_long_test",
+				"type_double_test",
+				"type_null_test",
+				"type_bool_test",
+				"type_array_string",
+				"type_array_boolean",
+				"type_nullable_array",
+				"type_enum",
+				"type_map",
+				"type_fixed",
+				"type_union",
+				"type_nested",
+				"type_bytes",
+				"type_date",
+				"type_time_millis",
+				"type_time_micros",
+				"type_timestamp_millis",
+				"type_timestamp_micros",
+				"type_decimal_bytes",
+				"type_decimal_fixed"},
+			Types.STRING,
+			Types.INT,
+			Types.STRING,
+			Types.LONG,
+			Types.DOUBLE,
+			Types.VOID,
+			Types.BOOLEAN,
+			Types.OBJECT_ARRAY(Types.STRING),
+			Types.OBJECT_ARRAY(Types.BOOLEAN),
+			Types.OBJECT_ARRAY(Types.STRING),
+			Types.STRING,
+			Types.MAP(Types.STRING, Types.LONG),
+			Types.PRIMITIVE_ARRAY(Types.BYTE),
+			Types.GENERIC(Object.class),
+			address,
+			Types.PRIMITIVE_ARRAY(Types.BYTE),
+			Types.SQL_DATE,
+			Types.SQL_TIME,
+			Types.INT,
+			Types.SQL_TIMESTAMP,
+			Types.LONG,
+			Types.BIG_DEC,
+			Types.BIG_DEC);
+
+		assertEquals(user, actual);
+
+		final RowTypeInfo userRowInfo = (RowTypeInfo) user;
+		assertTrue(userRowInfo.schemaEquals(actual));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
index fbabb95..ccba0a5 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.formats.avro.typeutils;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -31,7 +32,6 @@ import org.apache.flink.formats.avro.AvroRecordInputFormatTest;
 import org.apache.flink.formats.avro.generated.Fixed16;
 import org.apache.flink.formats.avro.generated.User;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
 
 import org.junit.After;
 import org.junit.Assert;
@@ -52,6 +52,7 @@ import java.util.Map;
  */
 @RunWith(Parameterized.class)
 public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
+
 	public AvroTypeExtractionTest(TestExecutionMode mode) {
 		super(mode);
 	}
@@ -80,7 +81,7 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		Path in = new Path(inFile.getAbsoluteFile().toURI());
 
-		AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+		AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
 		DataSet<User> usersDS = env.createInput(users)
 				.map((value) -> value);
 
@@ -88,8 +89,19 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
 
 		env.execute("Simple Avro read job");
 
-		expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": {\"KEY 2\": 17554, \"KEY 1\": 8546456}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" +
-					"{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n";
+		expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, " +
+			"\"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], " +
+			"\"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": {\"KEY 2\": 17554, \"KEY 1\": 8546456}, " +
+			"\"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}, " +
+			"\"type_bytes\": {\"bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\"}, \"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12.000, " +
+			"\"type_time_micros\": 123456, \"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, \"type_timestamp_micros\": 123456, " +
+			"\"type_decimal_bytes\": {\"bytes\": \"\\u0007Ð\"}, \"type_decimal_fixed\": [7, -48]}\n" +
+			"{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, " +
+			"\"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, " +
+			"\"type_enum\": \"RED\", \"type_map\": {}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", " +
+			"\"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}, \"type_bytes\": {\"bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\"}, " +
+			"\"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12.000, \"type_time_micros\": 123456, \"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, " +
+			"\"type_timestamp_micros\": 123456, \"type_decimal_bytes\": {\"bytes\": \"\\u0007Ð\"}, \"type_decimal_fixed\": [7, -48]}\n";
 	}
 
 	@Test
@@ -98,24 +110,31 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
 		env.getConfig().enableForceAvro();
 		Path in = new Path(inFile.getAbsoluteFile().toURI());
 
-		AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+		AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
 		DataSet<User> usersDS = env.createInput(users)
-				.map(new MapFunction<User, User>() {
-					@Override
-					public User map(User value) throws Exception {
-						Map<CharSequence, Long> ab = new HashMap<CharSequence, Long>(1);
-						ab.put("hehe", 12L);
-						value.setTypeMap(ab);
-						return value;
-					}
+				.map((MapFunction<User, User>) value -> {
+					Map<CharSequence, Long> ab = new HashMap<>(1);
+					ab.put("hehe", 12L);
+					value.setTypeMap(ab);
+					return value;
 				});
 
 		usersDS.writeAsText(resultPath);
 
 		env.execute("Simple Avro read job");
 
-		expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" +
-					"{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n";
+		expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, " +
+			"\"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, " +
+			"\"type_enum\": \"GREEN\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", " +
+			"\"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}, \"type_bytes\": {\"bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\"}, " +
+			"\"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12.000, \"type_time_micros\": 123456, \"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, \"type_timestamp_micros\": 123456, " +
+			"\"type_decimal_bytes\": {\"bytes\": \"\\u0007Ð\"}, \"type_decimal_fixed\": [7, -48]}\n" +
+			"{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, " +
+			"\"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {\"hehe\": 12}, " +
+			"\"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}, " +
+			"\"type_bytes\": {\"bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\"}, \"type_date\": 2014-03-01, \"type_time_millis\": 12:12:12.000, " +
+			"\"type_time_micros\": 123456, \"type_timestamp_millis\": 2014-03-01T12:12:12.321Z, \"type_timestamp_micros\": 123456, \"type_decimal_bytes\": {\"bytes\": \"\\u0007Ð\"}, " +
+			"\"type_decimal_fixed\": [7, -48]}\n";
 
 	}
 
@@ -125,17 +144,17 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
 		env.getConfig().enableObjectReuse();
 		Path in = new Path(inFile.getAbsoluteFile().toURI());
 
-		AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+		AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
 		DataSet<User> usersDS = env.createInput(users);
 
-		DataSet<Tuple2<String, Integer>> res = usersDS.groupBy("name").reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() {
-			@Override
-			public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception {
+		DataSet<Tuple2<String, Integer>> res = usersDS
+			.groupBy("name")
+			.reduceGroup((GroupReduceFunction<User, Tuple2<String, Integer>>) (values, out) -> {
 				for (User u : values) {
-					out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1));
+					out.collect(new Tuple2<>(u.getName().toString(), 1));
 				}
-			}
-		});
+			})
+			.returns(Types.TUPLE(Types.STRING, Types.INT));
 		res.writeAsText(resultPath);
 		env.execute("Avro Key selection");
 
@@ -148,22 +167,17 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
 		env.getConfig().enableForceAvro();
 		Path in = new Path(inFile.getAbsoluteFile().toURI());
 
-		AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+		AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
 		DataSet<User> usersDS = env.createInput(users);
 
-		DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new KeySelector<User, String>() {
-			@Override
-			public String getKey(User value) throws Exception {
-				return String.valueOf(value.getName());
-			}
-		}).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() {
-			@Override
-			public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception {
+		DataSet<Tuple2<String, Integer>> res = usersDS
+			.groupBy((KeySelector<User, String>) value -> String.valueOf(value.getName()))
+			.reduceGroup((GroupReduceFunction<User, Tuple2<String, Integer>>) (values, out) -> {
 				for (User u : values) {
-					out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1));
+					out.collect(new Tuple2<>(u.getName().toString(), 1));
 				}
-			}
-		});
+			})
+			.returns(Types.TUPLE(Types.STRING, Types.INT));
 
 		res.writeAsText(resultPath);
 		env.execute("Avro Key selection");
@@ -177,22 +191,17 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
 		env.getConfig().enableForceKryo();
 		Path in = new Path(inFile.getAbsoluteFile().toURI());
 
-		AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+		AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
 		DataSet<User> usersDS = env.createInput(users);
 
-		DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new KeySelector<User, String>() {
-			@Override
-			public String getKey(User value) throws Exception {
-				return String.valueOf(value.getName());
-			}
-		}).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() {
-			@Override
-			public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception {
+		DataSet<Tuple2<String, Integer>> res = usersDS
+			.groupBy((KeySelector<User, String>) value -> String.valueOf(value.getName()))
+			.reduceGroup((GroupReduceFunction<User, Tuple2<String, Integer>>) (values, out) -> {
 				for (User u : values) {
-					out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1));
+					out.collect(new Tuple2<>(u.getName().toString(), 1));
 				}
-			}
-		});
+			})
+			.returns(Types.TUPLE(Types.STRING, Types.INT));
 
 		res.writeAsText(resultPath);
 		env.execute("Avro Key selection");
@@ -216,17 +225,17 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		Path in = new Path(inFile.getAbsoluteFile().toURI());
 
-		AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+		AvroInputFormat<User> users = new AvroInputFormat<>(in, User.class);
 		DataSet<User> usersDS = env.createInput(users);
 
-		DataSet<Object> res = usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction<User, Object>() {
-			@Override
-			public void reduce(Iterable<User> values, Collector<Object> out) throws Exception {
+		DataSet<Object> res = usersDS
+			.groupBy(fieldName)
+			.reduceGroup((GroupReduceFunction<User, Object>) (values, out) -> {
 				for (User u : values) {
 					out.collect(u.get(fieldName));
 				}
-			}
-		});
+			})
+			.returns(Object.class);
 		res.writeAsText(resultPath);
 		env.execute("Simple Avro read job");
 
@@ -234,14 +243,19 @@ public class AvroTypeExtractionTest extends MultipleProgramsTestBase {
 		ExecutionConfig ec = env.getConfig();
 		Assert.assertTrue(ec.getRegisteredKryoTypes().contains(Fixed16.class));
 
-		if (fieldName.equals("name")) {
-			expected = "Alyssa\nCharlie";
-		} else if (fieldName.equals("type_enum")) {
-			expected = "GREEN\nRED\n";
-		} else if (fieldName.equals("type_double_test")) {
-			expected = "123.45\n1.337\n";
-		} else {
-			Assert.fail("Unknown field");
+		switch (fieldName) {
+			case "name":
+				expected = "Alyssa\nCharlie";
+				break;
+			case "type_enum":
+				expected = "GREEN\nRED\n";
+				break;
+			case "type_double_test":
+				expected = "123.45\n1.337\n";
+				break;
+			default:
+				Assert.fail("Unknown field");
+				break;
 		}
 
 		after();

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
index f641636..cfd1506 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.formats.avro.generated.SimpleUser;
 import org.apache.flink.formats.avro.utils.TestDataGenerator;
 
 import org.junit.Test;
@@ -51,12 +51,16 @@ import static org.junit.Assert.assertTrue;
  * works properly.
  *
  * <p>This test can be dropped once we drop backwards compatibility with Flink 1.3 snapshots.
+ *
+ * <p>The {@link BackwardsCompatibleAvroSerializer} does not support custom Kryo registrations (which
+ * logical types require for Avro 1.8 because Kryo does not support Joda-Time). We introduced a
+ * simpler user record for pre-Avro 1.8 test cases.
  */
 public class BackwardsCompatibleAvroSerializerTest {
 
-	private static final String SNAPSHOT_RESOURCE = "flink-1.3-avro-type-serializer-snapshot";
+	private static final String SNAPSHOT_RESOURCE = "flink-1.6-avro-type-serializer-snapshot";
 
-	private static final String DATA_RESOURCE = "flink-1.3-avro-type-serialized-data";
+	private static final String DATA_RESOURCE = "flink-1.6-avro-type-serialized-data";
 
 	@SuppressWarnings("unused")
 	private static final String SNAPSHOT_RESOURCE_WRITER = "/data/repositories/flink/flink-formats/flink-avro/src/test/resources/" + SNAPSHOT_RESOURCE;
@@ -73,7 +77,7 @@ public class BackwardsCompatibleAvroSerializerTest {
 
 		// retrieve the old config snapshot
 
-		final TypeSerializer<User> serializer;
+		final TypeSerializer<SimpleUser> serializer;
 		final TypeSerializerConfigSnapshot configSnapshot;
 
 		try (InputStream in = getClass().getClassLoader().getResourceAsStream(SNAPSHOT_RESOURCE)) {
@@ -86,7 +90,7 @@ public class BackwardsCompatibleAvroSerializerTest {
 			assertEquals(1, deserialized.size());
 
 			@SuppressWarnings("unchecked")
-			final TypeSerializer<User> typedSerializer = (TypeSerializer<User>) deserialized.get(0).f0;
+			final TypeSerializer<SimpleUser> typedSerializer = (TypeSerializer<SimpleUser>) deserialized.get(0).f0;
 
 			serializer = typedSerializer;
 			configSnapshot = deserialized.get(0).f1;
@@ -104,14 +108,14 @@ public class BackwardsCompatibleAvroSerializerTest {
 		// sanity check for the test: check that a PoJoSerializer and the original serializer work together
 		assertFalse(serializer.ensureCompatibility(configSnapshot).isRequiresMigration());
 
-		final TypeSerializer<User> newSerializer = new AvroTypeInfo<>(User.class, true).createSerializer(new ExecutionConfig());
+		final TypeSerializer<SimpleUser> newSerializer = new AvroTypeInfo<>(SimpleUser.class, true).createSerializer(new ExecutionConfig());
 		assertFalse(newSerializer.ensureCompatibility(configSnapshot).isRequiresMigration());
 
 		// deserialize the data and make sure this still works
 		validateDeserialization(newSerializer);
 
 		TypeSerializerConfigSnapshot nextSnapshot = newSerializer.snapshotConfiguration();
-		final TypeSerializer<User> nextSerializer = new AvroTypeInfo<>(User.class, true).createSerializer(new ExecutionConfig());
+		final TypeSerializer<SimpleUser> nextSerializer = new AvroTypeInfo<>(SimpleUser.class, true).createSerializer(new ExecutionConfig());
 
 		assertFalse(nextSerializer.ensureCompatibility(nextSnapshot).isRequiresMigration());
 
@@ -119,7 +123,7 @@ public class BackwardsCompatibleAvroSerializerTest {
 		validateDeserialization(nextSerializer);
 	}
 
-	private static void validateDeserialization(TypeSerializer<User> serializer) throws IOException {
+	private static void validateDeserialization(TypeSerializer<SimpleUser> serializer) throws IOException {
 		final Random rnd = new Random(RANDOM_SEED);
 
 		try (InputStream in = BackwardsCompatibleAvroSerializerTest.class.getClassLoader()
@@ -128,10 +132,10 @@ public class BackwardsCompatibleAvroSerializerTest {
 			final DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(in);
 
 			for (int i = 0; i < NUM_DATA_ENTRIES; i++) {
-				final User deserialized = serializer.deserialize(inView);
+				final SimpleUser deserialized = serializer.deserialize(inView);
 
 				// deterministically generate a reference record
-				final User reference = TestDataGenerator.generateRandomUser(rnd);
+				final SimpleUser reference = TestDataGenerator.generateRandomSimpleUser(rnd);
 
 				assertEquals(reference, deserialized);
 			}
@@ -141,9 +145,9 @@ public class BackwardsCompatibleAvroSerializerTest {
 // run this code to generate the test data
 //	public static void main(String[] args) throws Exception {
 //
-//		AvroTypeInfo<User> typeInfo = new AvroTypeInfo<>(User.class);
+//		AvroTypeInfo<SimpleUser> typeInfo = new AvroTypeInfo<>(SimpleUser.class);
 //
-//		TypeSerializer<User> serializer = typeInfo.createPojoSerializer(new ExecutionConfig());
+//		TypeSerializer<SimpleUser> serializer = typeInfo.createPojoSerializer(new ExecutionConfig());
 //		TypeSerializerConfigSnapshot confSnapshot = serializer.snapshotConfiguration();
 //
 //		try (FileOutputStream fos = new FileOutputStream(SNAPSHOT_RESOURCE_WRITER)) {
@@ -160,7 +164,7 @@ public class BackwardsCompatibleAvroSerializerTest {
 //			final Random rnd = new Random(RANDOM_SEED);
 //
 //			for (int i = 0; i < NUM_DATA_ENTRIES; i++) {
-//				serializer.serialize(TestDataGenerator.generateRandomUser(rnd), out);
+//				serializer.serialize(TestDataGenerator.generateRandomSimpleUser(rnd), out);
 //			}
 //		}
 //	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
index ce23ccc..9d77f32 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
@@ -18,87 +18,44 @@
 
 package org.apache.flink.formats.avro.utils;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.formats.avro.generated.Address;
 import org.apache.flink.formats.avro.generated.Colors;
+import org.apache.flink.formats.avro.generated.Fixed16;
+import org.apache.flink.formats.avro.generated.Fixed2;
 import org.apache.flink.formats.avro.generated.User;
 import org.apache.flink.types.Row;
 
 import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
 import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
 
 /**
  * Utilities for creating Avro Schemas.
  */
 public final class AvroTestUtils {
 
-	private static final String NAMESPACE = "org.apache.flink.streaming.connectors.kafka";
-
-	/**
-	 * Creates a flat Avro Schema for testing.
-	 */
-	public static Schema createFlatAvroSchema(String[] fieldNames, TypeInformation[] fieldTypes) {
-		final SchemaBuilder.FieldAssembler<Schema> fieldAssembler = SchemaBuilder
-			.record("BasicAvroRecord")
-			.namespace(NAMESPACE)
-			.fields();
-
-		final Schema nullSchema = Schema.create(Schema.Type.NULL);
-
-		for (int i = 0; i < fieldNames.length; i++) {
-			Schema schema = ReflectData.get().getSchema(fieldTypes[i].getTypeClass());
-			Schema unionSchema = Schema.createUnion(Arrays.asList(nullSchema, schema));
-			fieldAssembler.name(fieldNames[i]).type(unionSchema).noDefault();
-		}
-
-		return fieldAssembler.endRecord();
-	}
-
 	/**
-	 * Tests a simple Avro data types without nesting.
+	 * Tests all Avro data types as well as nested types for a specific record.
 	 */
-	public static Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> getSimpleTestData() {
-		final Address addr = Address.newBuilder()
-			.setNum(42)
-			.setStreet("Main Street 42")
-			.setCity("Test City")
-			.setState("Test State")
-			.setZip("12345")
-			.build();
-
-		final Row rowAddr = new Row(5);
-		rowAddr.setField(0, 42);
-		rowAddr.setField(1, "Main Street 42");
-		rowAddr.setField(2, "Test City");
-		rowAddr.setField(3, "Test State");
-		rowAddr.setField(4, "12345");
-
-		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> t = new Tuple3<>();
-		t.f0 = Address.class;
-		t.f1 = addr;
-		t.f2 = rowAddr;
-
-		return t;
-	}
-
-	/**
-	 * Tests all Avro data types as well as nested types.
-	 */
-	public static Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> getComplexTestData() {
+	public static Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> getSpecificTestData() {
 		final Address addr = Address.newBuilder()
 			.setNum(42)
 			.setStreet("Main Street 42")
@@ -122,17 +79,30 @@ public final class AvroTestUtils {
 			.setTypeDoubleTest(1.337d)
 			.setTypeNullTest(null)
 			.setTypeBoolTest(false)
-			.setTypeArrayString(new ArrayList<CharSequence>())
-			.setTypeArrayBoolean(new ArrayList<Boolean>())
+			.setTypeArrayString(Arrays.asList("hello", "world"))
+			.setTypeArrayBoolean(Arrays.asList(true, true, false))
 			.setTypeNullableArray(null)
 			.setTypeEnum(Colors.RED)
-			.setTypeMap(new HashMap<CharSequence, Long>())
-			.setTypeFixed(null)
-			.setTypeUnion(null)
+			.setTypeMap(Collections.singletonMap("test", 12L))
+			.setTypeFixed(new Fixed16(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}))
+			.setTypeUnion(12.0)
 			.setTypeNested(addr)
+			.setTypeBytes(ByteBuffer.allocate(10))
+			.setTypeDate(LocalDate.parse("2014-03-01"))
+			.setTypeTimeMillis(LocalTime.parse("12:12:12"))
+			.setTypeTimeMicros(123456)
+			.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
+			.setTypeTimestampMicros(123456L)
+			// byte array must contain the two's-complement representation of the
+			// unscaled integer value in big-endian byte order
+			.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
+			// array of length n can store at most
+			// Math.floor(Math.log10(Math.pow(2, 8 * n - 1) - 1))
+			// base-10 digits of precision
+			.setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
 			.build();
 
-		final Row rowUser = new Row(15);
+		final Row rowUser = new Row(23);
 		rowUser.setField(0, "Charlie");
 		rowUser.setField(1, null);
 		rowUser.setField(2, "blue");
@@ -140,16 +110,24 @@ public final class AvroTestUtils {
 		rowUser.setField(4, 1.337d);
 		rowUser.setField(5, null);
 		rowUser.setField(6, false);
-		rowUser.setField(7, new ArrayList<CharSequence>());
-		rowUser.setField(8, new ArrayList<Boolean>());
+		rowUser.setField(7, new String[]{"hello", "world"});
+		rowUser.setField(8, new Boolean[]{true, true, false});
 		rowUser.setField(9, null);
-		rowUser.setField(10, Colors.RED);
-		rowUser.setField(11, new HashMap<CharSequence, Long>());
-		rowUser.setField(12, null);
-		rowUser.setField(13, null);
+		rowUser.setField(10, "RED");
+		rowUser.setField(11, Collections.singletonMap("test", 12L));
+		rowUser.setField(12, new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16});
+		rowUser.setField(13, 12.0);
 		rowUser.setField(14, rowAddr);
-
-		final Tuple3<Class<? extends SpecificRecordBase>, SpecificRecord, Row> t = new Tuple3<>();
+		rowUser.setField(15, new byte[10]);
+		rowUser.setField(16, Date.valueOf("2014-03-01"));
+		rowUser.setField(17, Time.valueOf("12:12:12"));
+		rowUser.setField(18, 123456);
+		rowUser.setField(19, Timestamp.valueOf("2014-03-01 12:12:12.321"));
+		rowUser.setField(20, 123456L);
+		rowUser.setField(21, BigDecimal.valueOf(2000, 2));
+		rowUser.setField(22, BigDecimal.valueOf(2000, 2));
+
+		final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> t = new Tuple3<>();
 		t.f0 = User.class;
 		t.f1 = user;
 		t.f2 = rowUser;
@@ -158,6 +136,109 @@ public final class AvroTestUtils {
 	}
 
 	/**
+	 * Tests almost all Avro data types as well as nested types for a generic record.
+	 */
+	public static Tuple3<GenericRecord, Row, Schema> getGenericTestData() {
+		final String schemaString =
+			"{\"type\":\"record\",\"name\":\"GenericUser\",\"namespace\":\"org.apache.flink.formats.avro.generated\"," +
+			"\"fields\": [{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]}," +
+			"{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]},{\"name\":\"type_long_test\",\"type\":[\"long\",\"null\"]}" +
+			",{\"name\":\"type_double_test\",\"type\":\"double\"},{\"name\":\"type_null_test\",\"type\":[\"null\"]}," +
+			"{\"name\":\"type_bool_test\",\"type\":[\"boolean\"]},{\"name\":\"type_array_string\",\"type\":" +
+			"{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"type_array_boolean\",\"type\":{\"type\":\"array\"," +
+			"\"items\":\"boolean\"}},{\"name\":\"type_nullable_array\",\"type\":[\"null\",{\"type\":\"array\"," +
+			"\"items\":\"string\"}],\"default\":null},{\"name\":\"type_enum\",\"type\":{\"type\":\"enum\"," +
+			"\"name\":\"Colors\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}},{\"name\":\"type_map\",\"type\":{\"type\":\"map\"," +
+			"\"values\":\"long\"}},{\"name\":\"type_fixed\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"Fixed16\"," +
+			"\"size\":16}],\"size\":16},{\"name\":\"type_union\",\"type\":[\"null\",\"boolean\",\"long\",\"double\"]}," +
+			"{\"name\":\"type_nested\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Address\",\"fields\":[{\"name\":\"num\"," +
+			"\"type\":\"int\"},{\"name\":\"street\",\"type\":\"string\"},{\"name\":\"city\",\"type\":\"string\"}," +
+			"{\"name\":\"state\",\"type\":\"string\"},{\"name\":\"zip\",\"type\":\"string\"}]}]},{\"name\":\"type_bytes\"," +
+			"\"type\":\"bytes\"},{\"name\":\"type_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}}," +
+			"{\"name\":\"type_time_millis\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}},{\"name\":\"type_time_micros\"," +
+			"\"type\":{\"type\":\"int\",\"logicalType\":\"time-micros\"}},{\"name\":\"type_timestamp_millis\",\"type\":{\"type\":\"long\"," +
+			"\"logicalType\":\"timestamp-millis\"}},{\"name\":\"type_timestamp_micros\",\"type\":{\"type\":\"long\"," +
+			"\"logicalType\":\"timestamp-micros\"}},{\"name\":\"type_decimal_bytes\",\"type\":{\"type\":\"bytes\"," +
+			"\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}},{\"name\":\"type_decimal_fixed\",\"type\":{\"type\":\"fixed\"," +
+			"\"name\":\"Fixed2\",\"size\":2,\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}";
+		final Schema schema = new Schema.Parser().parse(schemaString);
+		GenericRecord addr = new GenericData.Record(schema.getField("type_nested").schema().getTypes().get(1));
+		addr.put("num", 42);
+		addr.put("street", "Main Street 42");
+		addr.put("city", "Test City");
+		addr.put("state", "Test State");
+		addr.put("zip", "12345");
+
+		final Row rowAddr = new Row(5);
+		rowAddr.setField(0, 42);
+		rowAddr.setField(1, "Main Street 42");
+		rowAddr.setField(2, "Test City");
+		rowAddr.setField(3, "Test State");
+		rowAddr.setField(4, "12345");
+
+		final GenericRecord user = new GenericData.Record(schema);
+		user.put("name", "Charlie");
+		user.put("favorite_number", null);
+		user.put("favorite_color", "blue");
+		user.put("type_long_test", 1337L);
+		user.put("type_double_test", 1.337d);
+		user.put("type_null_test", null);
+		user.put("type_bool_test", false);
+		user.put("type_array_string", Arrays.asList("hello", "world"));
+		user.put("type_array_boolean", Arrays.asList(true, true, false));
+		user.put("type_nullable_array", null);
+		user.put("type_enum", new GenericData.EnumSymbol(schema.getField("type_enum").schema(), "RED"));
+		user.put("type_map", Collections.singletonMap("test", 12L));
+		user.put("type_fixed", new Fixed16(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}));
+		user.put("type_union", 12.0);
+		user.put("type_nested", addr);
+		user.put("type_bytes", ByteBuffer.allocate(10));
+		user.put("type_date", LocalDate.parse("2014-03-01"));
+		user.put("type_time_millis", LocalTime.parse("12:12:12"));
+		user.put("type_time_micros", 123456);
+		user.put("type_timestamp_millis", DateTime.parse("2014-03-01T12:12:12.321Z"));
+		user.put("type_timestamp_micros", 123456L);
+		user.put("type_decimal_bytes",
+			ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+		user.put("type_decimal_fixed",
+			new GenericData.Fixed(
+				schema.getField("type_decimal_fixed").schema(),
+				BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+
+		final Row rowUser = new Row(23);
+		rowUser.setField(0, "Charlie");
+		rowUser.setField(1, null);
+		rowUser.setField(2, "blue");
+		rowUser.setField(3, 1337L);
+		rowUser.setField(4, 1.337d);
+		rowUser.setField(5, null);
+		rowUser.setField(6, false);
+		rowUser.setField(7, new String[]{"hello", "world"});
+		rowUser.setField(8, new Boolean[]{true, true, false});
+		rowUser.setField(9, null);
+		rowUser.setField(10, "RED");
+		rowUser.setField(11, Collections.singletonMap("test", 12L));
+		rowUser.setField(12, new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16});
+		rowUser.setField(13, 12.0);
+		rowUser.setField(14, rowAddr);
+		rowUser.setField(15, new byte[10]);
+		rowUser.setField(16, Date.valueOf("2014-03-01"));
+		rowUser.setField(17, Time.valueOf("12:12:12"));
+		rowUser.setField(18, 123456);
+		rowUser.setField(19, Timestamp.valueOf("2014-03-01 12:12:12.321"));
+		rowUser.setField(20, 123456L);
+		rowUser.setField(21, BigDecimal.valueOf(2000, 2));
+		rowUser.setField(22, BigDecimal.valueOf(2000, 2));
+
+		final Tuple3<GenericRecord, Row, Schema> t = new Tuple3<>();
+		t.f0 = user;
+		t.f1 = rowUser;
+		t.f2 = schema;
+
+		return t;
+	}
+
+	/**
 	 * Writes given record using specified schema.
 	 * @param record record to serialize
 	 * @param schema schema to use for serialization

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
index 9205627..a4c5bf8 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
@@ -21,8 +21,16 @@ package org.apache.flink.formats.avro.utils;
 import org.apache.flink.formats.avro.generated.Address;
 import org.apache.flink.formats.avro.generated.Colors;
 import org.apache.flink.formats.avro.generated.Fixed16;
+import org.apache.flink.formats.avro.generated.Fixed2;
+import org.apache.flink.formats.avro.generated.SimpleUser;
 import org.apache.flink.formats.avro.generated.User;
 
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -49,7 +57,35 @@ public class TestDataGenerator {
 				new HashMap<>(),
 				generateRandomFixed16(rnd),
 				generateRandomUnion(rnd),
-				generateRandomAddress(rnd));
+				generateRandomAddress(rnd),
+				generateRandomBytes(rnd),
+				LocalDate.parse("2014-03-01"),
+				LocalTime.parse("12:12:12"),
+				123456,
+				DateTime.parse("2014-03-01T12:12:12.321Z"),
+				123456L,
+				ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()),
+				new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+	}
+
+	public static SimpleUser generateRandomSimpleUser(Random rnd) {
+		return new SimpleUser(
+				generateRandomString(rnd, 50),
+				rnd.nextBoolean() ? null : rnd.nextInt(),
+				rnd.nextBoolean() ? null : generateRandomString(rnd, 6),
+				rnd.nextBoolean() ? null : rnd.nextLong(),
+				rnd.nextDouble(),
+				null,
+				rnd.nextBoolean(),
+				generateRandomStringList(rnd, 20, 30),
+				generateRandomBooleanList(rnd, 20),
+				rnd.nextBoolean() ? null : generateRandomStringList(rnd, 20, 20),
+				generateRandomColor(rnd),
+				new HashMap<>(),
+				generateRandomFixed16(rnd),
+				generateRandomUnion(rnd),
+				generateRandomAddress(rnd),
+				generateRandomBytes(rnd));
 	}
 
 	public static Colors generateRandomColor(Random rnd) {
@@ -76,6 +112,12 @@ public class TestDataGenerator {
 				generateRandomString(rnd, 20));
 	}
 
+	public static ByteBuffer generateRandomBytes(Random rnd) {
+		final byte[] bytes = new byte[10];
+		rnd.nextBytes(bytes);
+		return ByteBuffer.wrap(bytes);
+	}
+
 	private static List<Boolean> generateRandomBooleanList(Random rnd, int maxEntries) {
 		final int num = rnd.nextInt(maxEntries + 1);
 		ArrayList<Boolean> list = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/table/descriptors/AvroTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/descriptors/AvroTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/descriptors/AvroTest.java
index 2345553..342b32c 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/descriptors/AvroTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/descriptors/AvroTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.table.api.ValidationException;
 
 import org.junit.Test;
 
-import java.util.Collections;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -38,12 +38,20 @@ public class AvroTest extends DescriptorTestBase {
 		removePropertyAndVerify(descriptors().get(0), "format.record-class");
 	}
 
+	@Test(expected = ValidationException.class)
+	public void testRecordClassAndAvroSchema() {
+		addPropertyAndVerify(descriptors().get(0), "format.avro-schema", "{...}");
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	@Override
 	public List<Descriptor> descriptors() {
 		final Descriptor desc1 = new Avro().recordClass(User.class);
-		return Collections.singletonList(desc1);
+
+		final Descriptor desc2 = new Avro().avroSchema("{...}");
+
+		return Arrays.asList(desc1, desc2);
 	}
 
 	@Override
@@ -53,7 +61,12 @@ public class AvroTest extends DescriptorTestBase {
 		props1.put("format.property-version", "1");
 		props1.put("format.record-class", "org.apache.flink.formats.avro.generated.User");
 
-		return Collections.singletonList(props1);
+		final Map<String, String> props2 = new HashMap<>();
+		props2.put("format.type", "avro");
+		props2.put("format.property-version", "1");
+		props2.put("format.avro-schema", "{...}");
+
+		return Arrays.asList(props1, props2);
 	}
 
 	@Override